Repository: kylin Updated Branches: refs/heads/KYLIN-2242 71994afb3 -> 639f1f66a (forced update)
KYLIN-2217 use columnâs identity as path name and add more logging info Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/639f1f66 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/639f1f66 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/639f1f66 Branch: refs/heads/KYLIN-2242 Commit: 639f1f66aef95a2a22bfd61a2e3fcf94646f041a Parents: 7de8aa1 Author: shaofengshi <shaofeng...@apache.org> Authored: Sun Jan 22 09:43:10 2017 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Sun Jan 22 16:56:52 2017 +0800 ---------------------------------------------------------------------- .../java/org/apache/kylin/common/util/HadoopUtil.java | 4 ++++ .../apache/kylin/cube/cli/DictionaryGeneratorCLI.java | 3 +++ .../org/apache/kylin/engine/mr/SortedColumnDFSFile.java | 4 ++++ .../apache/kylin/engine/mr/common/AbstractHadoopJob.java | 2 +- .../kylin/engine/mr/steps/CreateDictionaryJob.java | 9 +++++++-- .../engine/mr/steps/FactDistinctColumnsReducer.java | 11 +++++++---- .../engine/mr/steps/UpdateCubeInfoAfterBuildStep.java | 4 ++-- .../hive/cardinality/HiveColumnCardinalityJob.java | 3 +++ 8 files changed, 31 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/639f1f66/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java index b9ffe38..f242515 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java @@ -143,6 +143,10 @@ public class HadoopUtil { } public static Path getFilterOnlyPath(FileSystem fs, Path baseDir, final String filter) throws IOException { + if (fs.exists(baseDir) == false) { + return null; + } + FileStatus[] fileStatus = fs.listStatus(baseDir, new PathFilter() { @Override public boolean accept(Path path) { http://git-wip-us.apache.org/repos/asf/kylin/blob/639f1f66/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java index 3e1ab0d..2e5a38e 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java @@ -63,11 +63,14 @@ public class DictionaryGeneratorCLI { if (dictProvider != null) { Dictionary<String> dict = dictProvider.getDictionary(col); if (dict != null) { + logger.debug("Dict for '" + col.getName() + "' has already been built, save it"); cubeMgr.saveDictionary(cubeSeg, col, inpTable, dict); } else { + logger.debug("Dict for '" + col.getName() + "' not pre-built, build it from " + inpTable.toString()); cubeMgr.buildDictionary(cubeSeg, col, inpTable); } } else { + logger.debug("Dict for '" + col.getName() + "' not pre-built, build it from " + inpTable.toString()); cubeMgr.buildDictionary(cubeSeg, col, inpTable); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/639f1f66/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java index d3f5cdc..f396b5a 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java @@ -128,4 +128,8 @@ public class SortedColumnDFSFile implements ReadableTable { return comparator; } + @Override + public String toString() { + return dfsPath; + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/639f1f66/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java index 567c1d0..44686d6 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java @@ -75,7 +75,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings("static-access") public abstract class AbstractHadoopJob extends Configured implements Tool { - protected static final Logger logger = LoggerFactory.getLogger(AbstractHadoopJob.class); + private static final Logger logger = LoggerFactory.getLogger(AbstractHadoopJob.class); protected static final Option OPTION_JOB_NAME = OptionBuilder.withArgName(BatchConstants.ARG_JOB_NAME).hasArg().isRequired(true).withDescription("Job name. For example, Kylin_Cuboid_Builder-clsfd_v2_Step_22-D)").create(BatchConstants.ARG_JOB_NAME); protected static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube name. For exmaple, flat_item_cube").create(BatchConstants.ARG_CUBE_NAME); http://git-wip-us.apache.org/repos/asf/kylin/blob/639f1f66/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java index e5d053b..be8c305 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java @@ -42,9 +42,13 @@ import org.apache.kylin.engine.mr.SortedColumnDFSFile; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.source.ReadableTable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class CreateDictionaryJob extends AbstractHadoopJob { + private static final Logger logger = LoggerFactory.getLogger(CreateDictionaryJob.class); + @Override public int run(String[] args) throws Exception { Options options = new Options(); @@ -68,11 +72,12 @@ public class CreateDictionaryJob extends AbstractHadoopJob { @Override public Dictionary<String> getDictionary(TblColRef col) throws IOException { - Path colDir = new Path(factColumnsInputPath, col.getName()); - FileSystem fs = HadoopUtil.getFileSystem(colDir.toString()); + Path colDir = new Path(factColumnsInputPath, col.getIdentity()); + FileSystem fs = HadoopUtil.getWorkingFileSystem(); Path dictFile = HadoopUtil.getFilterOnlyPath(fs, colDir, col.getName() + FactDistinctColumnsReducer.DICT_FILE_POSTFIX); if (dictFile == null) { + logger.info("Dict for '" + col.getName() + "' not pre-built."); return null; } http://git-wip-us.apache.org/repos/asf/kylin/blob/639f1f66/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java index 5d2fb72..8c56bdf 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import com.google.common.base.Preconditions; import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.BytesWritable; @@ -124,12 +125,14 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK // normal col col = columnList.get(reducerIdToColumnIndex.get(taskId)); + Preconditions.checkNotNull(col); + // local build dict isReducerLocalBuildDict = config.isReducerLocalBuildDict(); if (cubeDesc.getDictionaryBuilderClass(col) != null) { // only works with default dictionary builder isReducerLocalBuildDict = false; } - if (col != null && isReducerLocalBuildDict) { + if (isReducerLocalBuildDict) { builder = DictionaryGenerator.newDictionaryBuilder(col.getType()); builder.init(null, 0); } @@ -190,7 +193,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK } else { byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1); // output written to baseDir/colName/-r-00000 (etc) - String fileName = col.getName() + "/"; + String fileName = col.getIdentity() + "/"; mos.write(BatchConstants.CFG_OUTPUT_COLUMN, NullWritable.get(), new Text(keyBytes), fileName); } } @@ -231,7 +234,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK private void outputPartitionInfo() throws IOException, InterruptedException { if (col != null) { // output written to baseDir/colName/colName.pci-r-00000 (etc) - String partitionFileName = col.getName() + "/" + col.getName() + PARTITION_COL_INFO_FILE_POSTFIX; + String partitionFileName = col.getIdentity() + "/" + col.getName() + PARTITION_COL_INFO_FILE_POSTFIX; mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new LongWritable(timeMinValue), partitionFileName); mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new LongWritable(timeMaxValue), partitionFileName); @@ -241,7 +244,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK private void outputDict(TblColRef col, Dictionary<String> dict) throws IOException, InterruptedException { // output written to baseDir/colName/colName.rldict-r-00000 (etc) - String dictFileName = col.getName() + "/" + col.getName() + DICT_FILE_POSTFIX; + String dictFileName = col.getIdentity() + "/" + col.getName() + DICT_FILE_POSTFIX; try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream outputStream = new DataOutputStream(baos);) { outputStream.writeUTF(dict.getClass().getName()); http://git-wip-us.apache.org/repos/asf/kylin/blob/639f1f66/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java index 81d5c42..79fe657 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java @@ -80,8 +80,8 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable { private void updateTimeRange(CubeSegment segment) throws IOException { final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef(); final String factColumnsInputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH); - Path colDir = new Path(factColumnsInputPath, partitionCol.getName()); - FileSystem fs = HadoopUtil.getFileSystem(colDir.toString()); + Path colDir = new Path(factColumnsInputPath, partitionCol.getIdentity()); + FileSystem fs = HadoopUtil.getWorkingFileSystem(); Path outputFile = HadoopUtil.getFilterOnlyPath(fs, colDir, partitionCol.getName() + FactDistinctColumnsReducer.PARTITION_COL_INFO_FILE_POSTFIX); if (outputFile == null) { throw new IOException("fail to find the partition file in base dir: " + colDir); http://git-wip-us.apache.org/repos/asf/kylin/blob/639f1f66/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java index ea72b54..f439ccb 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java @@ -37,6 +37,8 @@ import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.TableDesc; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This hadoop job will scan all rows of the hive table and then calculate the cardinality on each column. @@ -44,6 +46,7 @@ import org.apache.kylin.metadata.model.TableDesc; * */ public class HiveColumnCardinalityJob extends AbstractHadoopJob { + private static final Logger logger = LoggerFactory.getLogger(HiveColumnCardinalityJob.class); public static final String JOB_TITLE = "Kylin Hive Column Cardinality Job"; @SuppressWarnings("static-access")