KYLIN-2764 Build UHC Dict Use MR
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ccd07686 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ccd07686 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ccd07686 Branch: refs/heads/2622-2764 Commit: ccd076861415a40926ab42e6f9b71cd5258d4daf Parents: 8de0d52 Author: kangkaisen <kangkai...@meituan.com> Authored: Fri Jul 21 15:22:30 2017 +0800 Committer: kangkaisen <kangkai...@meituan.com> Committed: Mon Sep 4 21:41:50 2017 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 11 +- .../java/org/apache/kylin/cube/CubeManager.java | 82 +++++----- .../apache/kylin/dict/DictionaryGenerator.java | 14 +- .../org/apache/kylin/dict/DictionaryInfo.java | 4 + .../kylin/dict/GlobalDictionaryBuilder.java | 9 +- .../apache/kylin/dict/IDictionaryBuilder.java | 2 +- .../global/SegmentAppendTrieDictBuilder.java | 4 +- .../kylin/dict/DictionaryProviderTest.java | 2 +- .../kylin/job/constant/ExecutableConstants.java | 1 + .../kylin/engine/mr/BatchCubingJobBuilder2.java | 26 +++- .../kylin/engine/mr/JobBuilderSupport.java | 24 ++- .../engine/mr/common/AbstractHadoopJob.java | 3 + .../kylin/engine/mr/common/BatchConstants.java | 4 + .../engine/mr/steps/CreateDictionaryJob.java | 16 +- .../mr/steps/FactDistinctColumnsReducer.java | 2 +- .../kylin/engine/mr/steps/UHCDictionaryJob.java | 154 +++++++++++++++++++ .../engine/mr/steps/UHCDictionaryMapper.java | 101 ++++++++++++ .../mr/steps/UHCDictionaryPartitioner.java | 30 ++++ .../engine/mr/steps/UHCDictionaryReducer.java | 113 ++++++++++++++ .../test_case_data/sandbox/kylin.properties | 5 + .../dict/ITGlobalDictionaryBuilderTest.java | 4 +- 21 files changed, 553 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/ccd07686/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 749b515..485bf21 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -899,6 +899,10 @@ abstract public class KylinConfigBase implements Serializable { return getPropertiesByPrefix("kylin.engine.mr.config-override."); } + public Map<String, String> getUHCMRConfigOverride() { + return getPropertiesByPrefix("kylin.engine.mr.uhc-config-override."); + } + public Map<String, String> getSparkConfigOverride() { return getPropertiesByPrefix("kylin.engine.spark-conf."); } @@ -925,9 +929,14 @@ abstract public class KylinConfigBase implements Serializable { //UHC: ultra high cardinality columns, contain the ShardByColumns and the GlobalDictionaryColumns public int getUHCReducerCount() { - return Integer.parseInt(getOptional("kylin.engine.mr.uhc-reducer-count", "1")); + return Integer.parseInt(getOptional("kylin.engine.mr.uhc-reducer-count", "5")); } + public boolean isBuildUHCDictWithMREnabled() { + return Boolean.parseBoolean(getOptional("kylin.engine.mr.build-uhc-dict", "true")); + } + + public boolean isBuildDictInReducerEnabled() { return Boolean.parseBoolean(getOptional("kylin.engine.mr.build-dict-in-reducer", "true")); } http://git-wip-us.apache.org/repos/asf/kylin/blob/ccd07686/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 043993c..1ca4b08 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -30,7 +30,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; -import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -229,6 +228,7 @@ public class CubeManager implements IRealizationProvider { return null; String builderClass = cubeDesc.getDictionaryBuilderClass(col); + DictionaryInfo dictInfo = getDictionaryManager().buildDictionary(cubeDesc.getModel(), col, inpTable, builderClass); @@ -878,6 +878,52 @@ public class CubeManager implements IRealizationProvider { return factDictCols; } + public List<TblColRef> getAllGlobalDictColumns(CubeDesc cubeDesc) { + List<TblColRef> globalDictCols = new ArrayList<TblColRef>(); + List<DictionaryDesc> dictionaryDescList = cubeDesc.getDictionaries(); + + if (dictionaryDescList == null) { + return globalDictCols; + } + + for (DictionaryDesc dictionaryDesc : dictionaryDescList) { + if (dictionaryDesc.getBuilderClass() != null) { + globalDictCols.add(dictionaryDesc.getColumnRef()); + } + } + return globalDictCols; + } + + //UHC (ultra high cardinality column): contain the ShardByColumns and the GlobalDictionaryColumns + public List<TblColRef> getAllUHCColumns(CubeDesc cubeDesc) { + List<TblColRef> uhcColumns = new ArrayList<TblColRef>(); + uhcColumns.addAll(getAllGlobalDictColumns(cubeDesc)); + uhcColumns.addAll(cubeDesc.getShardByColumns()); + + //handle PK-FK, see getAllDictColumnsOnFact + try { + uhcColumns.retainAll(getAllDictColumnsOnFact(cubeDesc)); + } catch (IOException e) { + throw new RuntimeException("Get all dict columns on fact failed"); + } + + return uhcColumns; + } + + public int[] getUHCIndex(CubeDesc cubeDesc) throws IOException { + List<TblColRef> factDictCols = getAllDictColumnsOnFact(cubeDesc); + List<TblColRef> uhcColumns = getAllUHCColumns(cubeDesc); + int[] uhcIndex = new int[factDictCols.size()]; + + for (int i = 0; i < factDictCols.size(); i++) { + if (uhcColumns.contains(factDictCols.get(i))) { + uhcIndex[i] = 1; + } + } + + return uhcIndex; + } + /** * Calculate the holes (gaps) in segments. * @param cubeName @@ -917,38 +963,4 @@ public class CubeManager implements IRealizationProvider { } return holes; } - - private final String GLOBAL_DICTIONNARY_CLASS = "org.apache.kylin.dict.GlobalDictionaryBuilder"; - - //UHC (ultra high cardinality column): contain the ShardByColumns and the GlobalDictionaryColumns - public int[] getUHCIndex(CubeDesc cubeDesc) throws IOException { - List<TblColRef> factDictCols = getAllDictColumnsOnFact(cubeDesc); - int[] uhcIndex = new int[factDictCols.size()]; - - //add GlobalDictionaryColumns - List<DictionaryDesc> dictionaryDescList = cubeDesc.getDictionaries(); - if (dictionaryDescList != null) { - for (DictionaryDesc dictionaryDesc : dictionaryDescList) { - if (dictionaryDesc.getBuilderClass() != null - && dictionaryDesc.getBuilderClass().equalsIgnoreCase(GLOBAL_DICTIONNARY_CLASS)) { - for (int i = 0; i < factDictCols.size(); i++) { - if (factDictCols.get(i).equals(dictionaryDesc.getColumnRef())) { - uhcIndex[i] = 1; - break; - } - } - } - } - } - - //add ShardByColumns - Set<TblColRef> shardByColumns = cubeDesc.getShardByColumns(); - for (int i = 0; i < factDictCols.size(); i++) { - if (shardByColumns.contains(factDictCols.get(i))) { - uhcIndex[i] = 1; - } - } - - return uhcIndex; - } } http://git-wip-us.apache.org/repos/asf/kylin/blob/ccd07686/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java index 61a0664..5fdecdb 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java @@ -70,7 +70,7 @@ public class DictionaryGenerator { ArrayList<String> samples = new ArrayList<String>(nSamples); // init the builder - builder.init(dictInfo, baseId); + builder.init(dictInfo, baseId, null); // add values while (valueEnumerator.moveNext()) { @@ -111,7 +111,7 @@ public class DictionaryGenerator { private String datePattern; @Override - public void init(DictionaryInfo info, int baseId) throws IOException { + public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException { this.baseId = baseId; } @@ -152,7 +152,7 @@ public class DictionaryGenerator { private static class TimeDictBuilder implements IDictionaryBuilder { @Override - public void init(DictionaryInfo info, int baseId) throws IOException { + public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException { } @Override @@ -176,7 +176,7 @@ public class DictionaryGenerator { TrieDictionaryBuilder builder; @Override - public void init(DictionaryInfo info, int baseId) throws IOException { + public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException { this.baseId = baseId; this.builder = new TrieDictionaryBuilder(new StringBytesConverter()); } @@ -200,7 +200,7 @@ public class DictionaryGenerator { TrieDictionaryForestBuilder builder; @Override - public void init(DictionaryInfo info, int baseId) throws IOException { + public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException { builder = new TrieDictionaryForestBuilder(new StringBytesConverter(), baseId); } @@ -225,7 +225,7 @@ public class DictionaryGenerator { NumberDictionaryBuilder builder; @Override - public void init(DictionaryInfo info, int baseId) throws IOException { + public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException { this.baseId = baseId; this.builder = new NumberDictionaryBuilder(); } @@ -249,7 +249,7 @@ public class DictionaryGenerator { NumberDictionaryForestBuilder builder; @Override - public void init(DictionaryInfo info, int baseId) throws IOException { + public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException { builder = new NumberDictionaryForestBuilder(baseId); } http://git-wip-us.apache.org/repos/asf/kylin/blob/ccd07686/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java index ae5c0f1..bfb1995 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java @@ -52,6 +52,10 @@ public class DictionaryInfo extends RootPersistentEntity { public DictionaryInfo() { } + public DictionaryInfo(ColumnDesc col, String dataType) { + this(col.getTable().getIdentity(), col.getName(), col.getZeroBasedIndex(), dataType, null); + } + public DictionaryInfo(ColumnDesc col, String dataType, TableSignature input) { this(col.getTable().getIdentity(), col.getName(), col.getZeroBasedIndex(), dataType, input); } http://git-wip-us.apache.org/repos/asf/kylin/blob/ccd07686/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java index 404d53c..8250fed 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java @@ -42,14 +42,17 @@ public class GlobalDictionaryBuilder implements IDictionaryBuilder { private static Logger logger = LoggerFactory.getLogger(GlobalDictionaryBuilder.class); - @Override - public void init(DictionaryInfo dictInfo, int baseId) throws IOException { + public void init(DictionaryInfo dictInfo, int baseId, String hdfsDir) throws IOException { sourceColumn = dictInfo.getSourceTable() + "_" + dictInfo.getSourceColumn(); lock = KylinConfig.getInstanceFromEnv().getDistributedLockFactory().lockForCurrentThread(); lock.lock(getLockPath(sourceColumn), Long.MAX_VALUE); int maxEntriesPerSlice = KylinConfig.getInstanceFromEnv().getAppendDictEntrySize(); - String baseDir = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/GlobalDict" + dictInfo.getResourceDir() + "/"; + if (hdfsDir == null) { + //build in Kylin job server + hdfsDir = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory(); + } + String baseDir = hdfsDir + "resources/GlobalDict" + dictInfo.getResourceDir() + "/"; this.builder = new AppendTrieDictionaryBuilder(baseDir, maxEntriesPerSlice, true); this.baseId = baseId; } http://git-wip-us.apache.org/repos/asf/kylin/blob/ccd07686/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java index 0934a7d..18bbb07 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java @@ -28,7 +28,7 @@ import org.apache.kylin.common.util.Dictionary; public interface IDictionaryBuilder { /** Sets the dictionary info for the dictionary being built. Mainly for GlobalDictionaryBuilder. */ - void init(DictionaryInfo info, int baseId) throws IOException; + void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException; /** Add a new value into dictionary, returns it is accepted (not null) or not. */ boolean addValue(String value); http://git-wip-us.apache.org/repos/asf/kylin/blob/ccd07686/core-dictionary/src/main/java/org/apache/kylin/dict/global/SegmentAppendTrieDictBuilder.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/global/SegmentAppendTrieDictBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/global/SegmentAppendTrieDictBuilder.java index 270deee..d996fb3 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/global/SegmentAppendTrieDictBuilder.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/global/SegmentAppendTrieDictBuilder.java @@ -38,7 +38,7 @@ public class SegmentAppendTrieDictBuilder implements IDictionaryBuilder { private String sourceColumn; @Override - public void init(DictionaryInfo dictInfo, int baseId) throws IOException { + public void init(DictionaryInfo dictInfo, int baseId, String hdfsDir) throws IOException { sourceColumn = dictInfo.getSourceTable() + "." + dictInfo.getSourceColumn(); int maxEntriesPerSlice = KylinConfig.getInstanceFromEnv().getAppendDictEntrySize(); @@ -48,7 +48,7 @@ public class SegmentAppendTrieDictBuilder implements IDictionaryBuilder { } //use UUID to make each segment dict in different HDFS dir and support concurrent build //use timestamp to make the segment dict easily to delete - String baseDir = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/SegmentDict" + dictInfo.getResourceDir() + "/" + UUID.randomUUID().toString() + "_" + System.currentTimeMillis()+ "/"; + String baseDir = hdfsDir + "resources/SegmentDict" + dictInfo.getResourceDir() + "/" + UUID.randomUUID().toString() + "_" + System.currentTimeMillis()+ "/"; this.builder = new AppendTrieDictionaryBuilder(baseDir, maxEntriesPerSlice, false); this.baseId = baseId; http://git-wip-us.apache.org/repos/asf/kylin/blob/ccd07686/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java index 4b386a7..7e2e218 100644 --- a/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java +++ b/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java @@ -84,7 +84,7 @@ public class DictionaryProviderTest extends LocalFileMetadataTestCase{ private Dictionary<String> getDict(DataType type, Iterator<String> values) throws Exception { IDictionaryBuilder builder = DictionaryGenerator.newDictionaryBuilder(type); - builder.init(null, 0); + builder.init(null, 0, null); while (values.hasNext()) { builder.addValue(values.next()); } http://git-wip-us.apache.org/repos/asf/kylin/blob/ccd07686/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java index 2de3efa..36496fe 100644 --- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java +++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java @@ -33,6 +33,7 @@ public final class ExecutableConstants { public static final String SOURCE_RECORDS_SIZE = "source_records_size"; public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension Dictionary"; + public static final String STEP_NAME_BUILD_UHC_DICTIONARY = "Build UHC Dictionary"; public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table"; public static final String STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP = "Materialize Hive View in Lookup Tables"; public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns"; http://git-wip-us.apache.org/repos/asf/kylin/blob/ccd07686/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java index 106077c..41a0a8c 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java @@ -18,6 +18,7 @@ package org.apache.kylin.engine.mr; +import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide; import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2; @@ -31,9 +32,12 @@ import org.apache.kylin.engine.mr.steps.NDCuboidJob; import org.apache.kylin.engine.mr.steps.SaveStatisticsStep; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; + public class BatchCubingJobBuilder2 extends JobBuilderSupport { private static final Logger logger = LoggerFactory.getLogger(BatchCubingJobBuilder2.class); @@ -58,6 +62,11 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { // Phase 2: Build Dictionary result.addTask(createFactDistinctColumnsStepWithStats(jobId)); + + if (isEnableUHCDictStep()) { + result.addTask(createBuildUHCDictStep(jobId)); + } + result.addTask(createBuildDictionaryStep(jobId)); result.addTask(createSaveStatisticsStep(jobId)); outputSide.addStepPhase2_BuildDictionary(result); @@ -75,13 +84,26 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { return result; } + private boolean isEnableUHCDictStep() { + if (!config.getConfig().isBuildUHCDictWithMREnabled()) { + return false; + } + + List<TblColRef> uhcColumns = CubeManager.getInstance(config.getConfig()).getAllUHCColumns(seg.getCubeDesc()); + if (uhcColumns.size() == 0) { + return false; + } + + return true; + } + protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) { final int maxLevel = seg.getCubeDesc().getBuildLevel(); // base cuboid step result.addTask(createBaseCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, 0), jobId)); // n dim cuboid steps for (int i = 1; i <= maxLevel; i++) { - result.addTask(createNDimensionCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, i-1), getCuboidOutputPathsByLevel(cuboidRootPath, i), i, jobId)); + result.addTask(createNDimensionCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, i - 1), getCuboidOutputPathsByLevel(cuboidRootPath, i), i, jobId)); } } @@ -138,7 +160,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { baseCuboidStep.setMapReduceParams(cmd.toString()); baseCuboidStep.setMapReduceJobClass(getBaseCuboidJob()); -// baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES); + // baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES); return baseCuboidStep; } http://git-wip-us.apache.org/repos/asf/kylin/blob/ccd07686/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java index c1ed345..2a51c89 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java @@ -26,6 +26,7 @@ import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.HadoopShellExecutable; import org.apache.kylin.engine.mr.common.MapReduceExecutable; import org.apache.kylin.engine.mr.steps.CreateDictionaryJob; +import org.apache.kylin.engine.mr.steps.UHCDictionaryJob; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.engine.mr.steps.FactDistinctColumnsJob; import org.apache.kylin.engine.mr.steps.MergeDictionaryStep; @@ -81,6 +82,22 @@ public class JobBuilderSupport { return result; } + public MapReduceExecutable createBuildUHCDictStep(String jobId) { + MapReduceExecutable result = new MapReduceExecutable(); + result.setName(ExecutableConstants.STEP_NAME_BUILD_UHC_DICTIONARY); + result.setMapReduceJobClass(UHCDictionaryJob.class); + StringBuilder cmd = new StringBuilder(); + appendMapReduceParameters(cmd); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getDictRootPath(jobId)); + appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, getFactDistinctColumnsPath(jobId)); + appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Build_UHC_Dict" + seg.getRealization().getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId); + result.setMapReduceParams(cmd.toString()); + result.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES); + return result; + } + public HadoopShellExecutable createBuildDictionaryStep(String jobId) { // base cuboid job HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable(); @@ -89,6 +106,7 @@ public class JobBuilderSupport { appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, getFactDistinctColumnsPath(jobId)); + appendExecCmdParameters(cmd, BatchConstants.ARG_DICT_PATH, getDictRootPath(jobId)); buildDictionaryStep.setJobParams(cmd.toString()); buildDictionaryStep.setJobClass(CreateDictionaryJob.class); @@ -104,7 +122,6 @@ public class JobBuilderSupport { CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams()); CubingExecutableUtil.setCubingJobId(jobId, result.getParams()); - return result; } @@ -176,6 +193,10 @@ public class JobBuilderSupport { return getRealizationRootPath(jobId) + "/fact_distinct_columns/" + BatchConstants.CFG_OUTPUT_STATISTICS; } + public String getDictRootPath(String jobId) { + return getRealizationRootPath(jobId) + "/dict"; + } + // ============================================================================ // static methods also shared by other job flow participant // ---------------------------------------------------------------------------- @@ -203,5 +224,4 @@ public class JobBuilderSupport { } } - } http://git-wip-us.apache.org/repos/asf/kylin/blob/ccd07686/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java index 081ac93..1100e38 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java @@ -112,6 +112,9 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { .withArgName(BatchConstants.ARG_STATS_SAMPLING_PERCENT).hasArg().isRequired(false) .withDescription("Statistics sampling percentage").create(BatchConstants.ARG_STATS_SAMPLING_PERCENT); + protected static final Option OPTION_DICT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_DICT_PATH). + hasArg().isRequired(false).withDescription("Dict path").create(BatchConstants.ARG_DICT_PATH); + private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath"; protected static void runJob(Tool job, String[] args) { http://git-wip-us.apache.org/repos/asf/kylin/blob/ccd07686/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java index 84ca006..9bd1418 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java @@ -65,6 +65,8 @@ public interface BatchConstants { String CFG_OUTPUT_STATISTICS = "statistics"; String CFG_OUTPUT_PARTITION = "partition"; String CFG_MR_SPARK_JOB = "mr.spark.job"; + + String CFG_GLOBAL_DICT_BASE_DIR = "global.dict.base.dir"; /** * command line ARGuments @@ -85,6 +87,8 @@ public interface BatchConstants { String ARG_HTABLE_NAME = "htablename"; String ARG_INPUT_FORMAT = "inputformat"; String ARG_LEVEL = "level"; + String ARG_DICT_PATH = "dictPath"; + /** * logger and counter http://git-wip-us.apache.org/repos/asf/kylin/blob/ccd07686/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java index 98ebbb4..d64d300 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java @@ -21,6 +21,7 @@ package org.apache.kylin.engine.mr.steps; import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.List; import org.apache.commons.cli.Options; import org.apache.hadoop.fs.FileSystem; @@ -35,6 +36,8 @@ import org.apache.kylin.common.util.ByteBufferBackedInputStream; import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.cli.DictionaryGeneratorCLI; import org.apache.kylin.dict.DictionaryProvider; import org.apache.kylin.dict.DistinctColumnValuesProvider; @@ -55,11 +58,13 @@ public class CreateDictionaryJob extends AbstractHadoopJob { options.addOption(OPTION_CUBE_NAME); options.addOption(OPTION_SEGMENT_ID); options.addOption(OPTION_INPUT_PATH); + options.addOption(OPTION_DICT_PATH); parseOptions(options, args); final String cubeName = getOptionValue(OPTION_CUBE_NAME); final String segmentID = getOptionValue(OPTION_SEGMENT_ID); final String factColumnsInputPath = getOptionValue(OPTION_INPUT_PATH); + final String dictPath = getOptionValue(OPTION_DICT_PATH); final KylinConfig config = KylinConfig.getInstanceFromEnv(); @@ -72,7 +77,16 @@ public class CreateDictionaryJob extends AbstractHadoopJob { @Override public Dictionary<String> getDictionary(TblColRef col) throws IOException { - Path colDir = new Path(factColumnsInputPath, col.getIdentity()); + CubeManager cubeManager = CubeManager.getInstance(config); + CubeInstance cube = cubeManager.getCube(cubeName); + List<TblColRef> uhcColumns = CubeManager.getInstance(config).getAllUHCColumns(cube.getDescriptor()); + + Path colDir; + if (uhcColumns.contains(col)) { + colDir = new Path(dictPath, col.getIdentity()); + } else { + colDir = new Path(factColumnsInputPath, col.getIdentity()); + } FileSystem fs = HadoopUtil.getWorkingFileSystem(); Path dictFile = HadoopUtil.getFilterOnlyPath(fs, colDir, col.getName() + FactDistinctColumnsReducer.DICT_FILE_POSTFIX); http://git-wip-us.apache.org/repos/asf/kylin/blob/ccd07686/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java index 7f01c3a..436cf06 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java @@ -141,7 +141,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK } if (buildDictInReducer) { builder = DictionaryGenerator.newDictionaryBuilder(col.getType()); - builder.init(null, 0); + builder.init(null, 0, null); } logger.info("Reducer " + taskId + " handling column " + col + ", buildDictInReducer=" + buildDictInReducer); } http://git-wip-us.apache.org/repos/asf/kylin/blob/ccd07686/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryJob.java new file mode 100644 index 0000000..485975a --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryJob.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.mr.steps; + +import org.apache.commons.cli.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.util.ToolRunner; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.metadata.model.TblColRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class UHCDictionaryJob extends AbstractHadoopJob { + protected static final Logger logger = LoggerFactory.getLogger(UHCDictionaryJob.class); + + private boolean isSkipped = false; + + @Override + public int run(String[] args) throws Exception { + Options options = new Options(); + + try { + options.addOption(OPTION_JOB_NAME); + options.addOption(OPTION_CUBE_NAME); + options.addOption(OPTION_CUBING_JOB_ID); + options.addOption(OPTION_OUTPUT_PATH); + options.addOption(OPTION_INPUT_PATH); + parseOptions(options, args); + + job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); + String job_id = getOptionValue(OPTION_CUBING_JOB_ID); + String cubeName = getOptionValue(OPTION_CUBE_NAME); + Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); + Path input = new Path(getOptionValue(OPTION_INPUT_PATH)); + + //add metadata to distributed cache + CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); + CubeInstance cube = cubeMgr.getCube(cubeName); + attachCubeMetadata(cube, job.getConfiguration()); + + List<TblColRef> uhcColumns = cubeMgr.getAllUHCColumns(cube.getDescriptor()); + int reducerCount = uhcColumns.size(); + + //Note! handle uhc columns is null. + boolean hasUHCValue = false; + for (TblColRef tblColRef : uhcColumns) { + Path path = new Path(input.toString() + "/" + tblColRef.getIdentity()); + if (HadoopUtil.getFileSystem(path).exists(path)) { + FileInputFormat.addInputPath(job, path); + hasUHCValue = true; + } + } + + if (!hasUHCValue) { + isSkipped = true; + return 0; + } + + setJobClasspath(job, cube.getConfig()); + setupMapper(); + setupReducer(output, reducerCount); + + job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); + job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, job_id); + job.getConfiguration().set(BatchConstants.CFG_GLOBAL_DICT_BASE_DIR, KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()); + job.getConfiguration().set(BatchConstants.CFG_MAPRED_OUTPUT_COMPRESS, "false"); + + //8G memory is enough for all global dict, because the input is sequential and we handle global dict slice by slice + job.getConfiguration().set("mapreduce.reduce.memory.mb", "8500"); + job.getConfiguration().set("mapred.reduce.child.java.opts", "-Xmx8g"); + //Copying global dict to working dir in GlobalDictHDFSStore maybe elapsed a long time (Maybe we could improve it) + //Waiting the global dict lock maybe also take a long time. + //So we set 8 hours here + job.getConfiguration().set("mapreduce.task.timeout", "28800000"); + + //allow user specially set config for uhc step + for (Map.Entry<String, String> entry : cube.getConfig().getUHCMRConfigOverride().entrySet()) { + job.getConfiguration().set(entry.getKey(), entry.getValue()); + } + + return waitForCompletion(job); + } finally { + if (job != null) + cleanupTempConfFile(job.getConfiguration()); + } + } + + private void setupMapper() throws IOException { + job.setInputFormatClass(SequenceFileInputFormat.class); + job.setMapperClass(UHCDictionaryMapper.class); + job.setMapOutputKeyClass(SelfDefineSortableKey.class); + job.setMapOutputValueClass(NullWritable.class); + } + + private void setupReducer(Path output, int numberOfReducers) throws IOException { + job.setReducerClass(UHCDictionaryReducer.class); + job.setPartitionerClass(UHCDictionaryPartitioner.class); + job.setNumReduceTasks(numberOfReducers); + + MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class, NullWritable.class, BytesWritable.class); + FileOutputFormat.setOutputPath(job, output); + job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString()); + + //prevent to create zero-sized default output + LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class); + + deletePath(job.getConfiguration(), output); + } + + @Override + public boolean isSkipped() { + return isSkipped; + } + + public static void main(String[] args) throws Exception { + UHCDictionaryJob job = new UHCDictionaryJob(); + int exitCode = ToolRunner.run(job, args); + System.exit(exitCode); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/ccd07686/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryMapper.java new file mode 100644 index 0000000..154c624 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryMapper.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.mr.steps; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.engine.mr.KylinMapper; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.metadata.model.TblColRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + +public class UHCDictionaryMapper extends KylinMapper<NullWritable, Text, SelfDefineSortableKey, NullWritable> { + private static final Logger logger = LoggerFactory.getLogger(UHCDictionaryMapper.class); + + protected int index; + protected DataType type; + + protected Text outputKey = new Text(); + private ByteBuffer tmpBuf; + private SelfDefineSortableKey sortableKey = new SelfDefineSortableKey(); + + @Override + protected void setup(Context context) throws IOException { + tmpBuf = ByteBuffer.allocate(4096); + + Configuration conf = context.getConfiguration(); + bindCurrentConfiguration(conf); + KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(); + + CubeInstance cube = CubeManager.getInstance(config).getCube(conf.get(BatchConstants.CFG_CUBE_NAME)); + List<TblColRef> uhcColumns = CubeManager.getInstance(config).getAllUHCColumns(cube.getDescriptor()); + + FileSplit fileSplit = (FileSplit) context.getInputSplit(); + String colName = fileSplit.getPath().getParent().getName(); + + for (int i = 0; i < uhcColumns.size(); i++) { + if (uhcColumns.get(i).getIdentity().equalsIgnoreCase(colName)) { + index = i; + break; + } + } + type = uhcColumns.get(index).getType(); + + //for debug + logger.info("column name: " + colName); + logger.info("index: " + index); + logger.info("type: " + type); + } + + @Override + public void doMap(NullWritable key, Text value, Context context) throws IOException, InterruptedException { + tmpBuf.clear(); + int size = value.getLength()+ 1; + if (size >= tmpBuf.capacity()) { + tmpBuf = ByteBuffer.allocate(countNewSize(tmpBuf.capacity(), size)); + } + tmpBuf.put(Bytes.toBytes(index)[3]); + tmpBuf.put(value.getBytes(), 0, value.getLength()); + outputKey.set(tmpBuf.array(), 0, tmpBuf.position()); + + sortableKey.init(outputKey, type); + context.write(sortableKey, NullWritable.get()); + } + + private int countNewSize(int oldSize, int dataSize) { + int newSize = oldSize * 2; + while (newSize < dataSize) { + newSize = newSize * 2; + } + return newSize; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/ccd07686/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryPartitioner.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryPartitioner.java new file mode 100644 index 0000000..5e8ffa6 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryPartitioner.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.mr.steps; + +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.kylin.common.util.BytesUtil; + +public class UHCDictionaryPartitioner extends Partitioner<SelfDefineSortableKey, NullWritable> { + @Override + public int getPartition(SelfDefineSortableKey skey, NullWritable value, int numReduceTasks) { + return BytesUtil.readUnsigned(skey.getText().getBytes(), 0, 1); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/ccd07686/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryReducer.java new file mode 100644 index 0000000..070b29b --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryReducer.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.mr.steps; + +import org.apache.commons.io.output.ByteArrayOutputStream; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.dict.DictionaryGenerator; +import org.apache.kylin.dict.DictionaryInfo; +import org.apache.kylin.dict.IDictionaryBuilder; +import org.apache.kylin.engine.mr.KylinReducer; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.metadata.model.TblColRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.List; + +import static org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducer.DICT_FILE_POSTFIX; + +public class UHCDictionaryReducer extends KylinReducer<SelfDefineSortableKey, NullWritable, NullWritable, BytesWritable> { + private static final Logger logger = LoggerFactory.getLogger(UHCDictionaryReducer.class); + + private IDictionaryBuilder builder; + private TblColRef col; + + private MultipleOutputs mos; + + @Override + protected void setup(Context context) throws IOException { + super.bindCurrentConfiguration(context.getConfiguration()); + Configuration conf = context.getConfiguration(); + mos = new MultipleOutputs(context); + + KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(); + String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME); + CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName); + CubeDesc cubeDesc = cube.getDescriptor(); + List<TblColRef> uhcColumns = CubeManager.getInstance(config).getAllUHCColumns(cubeDesc); + + int taskId = context.getTaskAttemptID().getTaskID().getId(); + col = uhcColumns.get(taskId); + logger.info("column name: " + col.getIdentity()); + + if (cube.getDescriptor().getShardByColumns().contains(col)) { + //for ShardByColumns + builder = DictionaryGenerator.newDictionaryBuilder(col.getType()); + builder.init(null, 0, null); + } else { + //for GlobalDictionaryColumns + String hdfsDir = conf.get(BatchConstants.CFG_GLOBAL_DICT_BASE_DIR); + DictionaryInfo dictionaryInfo = new DictionaryInfo(col.getColumnDesc(), col.getDatatype()); + String builderClass = cubeDesc.getDictionaryBuilderClass(col); + builder = (IDictionaryBuilder) ClassUtil.newInstance(builderClass); + builder.init(dictionaryInfo, 0, hdfsDir); + } + } + + @Override + public void doReduce(SelfDefineSortableKey skey, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { + Text key = skey.getText(); + String value = Bytes.toString(key.getBytes(), 1, key.getLength() - 1); + builder.addValue(value); + } + + @Override + protected void doCleanup(Context context) throws IOException, InterruptedException { + Dictionary<String> dict = builder.build(); + outputDict(col, dict); + } + + private void outputDict(TblColRef col, Dictionary<String> dict) throws IOException, InterruptedException { + // output written to baseDir/colName/colName.rldict-r-00000 (etc) + String dictFileName = col.getIdentity() + "/" + col.getName() + DICT_FILE_POSTFIX; + + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream outputStream = new DataOutputStream(baos);) { + outputStream.writeUTF(dict.getClass().getName()); + dict.write(outputStream); + + mos.write(BatchConstants.CFG_OUTPUT_DICT, NullWritable.get(), new BytesWritable(baos.toByteArray()), dictFileName); + } + mos.close(); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/ccd07686/examples/test_case_data/sandbox/kylin.properties ---------------------------------------------------------------------- diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties index 619bf99..55eb719 100644 --- a/examples/test_case_data/sandbox/kylin.properties +++ b/examples/test_case_data/sandbox/kylin.properties @@ -48,6 +48,8 @@ kylin.storage.url=hbase # Working folder in HDFS, make sure user has the right access to the hdfs directory kylin.env.hdfs-working-dir=/kylin +kylin.env.zookeeper-connect-string=sandbox.hortonworks.com + # HBase Cluster FileSystem, which serving hbase, format as hdfs://hbase-cluster:8020 # Leave empty if hbase running on same cluster with hive and mapreduce #kylin.storage.hbase.cluster-fs= @@ -55,6 +57,9 @@ kylin.env.hdfs-working-dir=/kylin kylin.engine.mr.reduce-input-mb=500 +kylin.engine.mr.uhc-config-override.mapreduce.reduce.memory.mb=500 +kylin.engine.mr.uhc-config-override.mapred.reduce.child.java.opts=-Xmx400M + ### JOB ### http://git-wip-us.apache.org/repos/asf/kylin/blob/ccd07686/kylin-it/src/test/java/org/apache/kylin/dict/ITGlobalDictionaryBuilderTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/dict/ITGlobalDictionaryBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/dict/ITGlobalDictionaryBuilderTest.java index df2ebf7..c578a57 100644 --- a/kylin-it/src/test/java/org/apache/kylin/dict/ITGlobalDictionaryBuilderTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/dict/ITGlobalDictionaryBuilderTest.java @@ -72,7 +72,7 @@ public class ITGlobalDictionaryBuilderTest extends HBaseMetadataTestCase { finishLatch.await(); GlobalDictionaryBuilder builder = new GlobalDictionaryBuilder(); - builder.init(dictionaryInfo, 0); + builder.init(dictionaryInfo, 0, KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()); builder.addValue("success"); Dictionary<String> dict = builder.build(); @@ -108,7 +108,7 @@ public class ITGlobalDictionaryBuilderTest extends HBaseMetadataTestCase { GlobalDictionaryBuilder builder = new GlobalDictionaryBuilder(); startLatch.countDown(); - builder.init(dictionaryInfo, 0); + builder.init(dictionaryInfo, 0, KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()); for (int i = 0; i < count; i++) { builder.addValue(prefix + i); }