KYLIN-1839 code review
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/aa1550c9 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/aa1550c9 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/aa1550c9 Branch: refs/heads/master-hbase1.x Commit: aa1550c9ecfb1c047c107fa51187a51e958ab189 Parents: 4473d71 Author: Yang Li <liy...@apache.org> Authored: Wed Oct 19 08:19:35 2016 +0800 Committer: Yang Li <liy...@apache.org> Committed: Wed Oct 19 08:19:35 2016 +0800 ---------------------------------------------------------------------- .../engine/mr/common/AbstractHadoopJob.java | 44 +++++++++++--------- 1 file changed, 24 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/aa1550c9/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java index bbb1711..f70e3bb 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java @@ -46,7 +46,6 @@ import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; @@ -80,7 +79,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { protected static final Option OPTION_JOB_NAME = OptionBuilder.withArgName(BatchConstants.ARG_JOB_NAME).hasArg().isRequired(true).withDescription("Job name. For example, Kylin_Cuboid_Builder-clsfd_v2_Step_22-D)").create(BatchConstants.ARG_JOB_NAME); protected static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube name. For exmaple, flat_item_cube").create(BatchConstants.ARG_CUBE_NAME); protected static final Option OPTION_CUBING_JOB_ID = OptionBuilder.withArgName(BatchConstants.ARG_CUBING_JOB_ID).hasArg().isRequired(false).withDescription("ID of cubing job executable").create(BatchConstants.ARG_CUBING_JOB_ID); -// @Deprecated + // @Deprecated protected static final Option OPTION_SEGMENT_NAME = OptionBuilder.withArgName(BatchConstants.ARG_SEGMENT_NAME).hasArg().isRequired(true).withDescription("Cube segment name").create(BatchConstants.ARG_SEGMENT_NAME); protected static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName(BatchConstants.ARG_SEGMENT_ID).hasArg().isRequired(true).withDescription("Cube segment id").create(BatchConstants.ARG_SEGMENT_ID); protected static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg().isRequired(true).withDescription("Input path").create(BatchConstants.ARG_INPUT); @@ -253,10 +252,10 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { // for KylinJobMRLibDir String mrLibDir = kylinConf.getKylinJobMRLibDir(); if (!StringUtils.isBlank(mrLibDir)) { - if(kylinDependency.length() > 0) { - kylinDependency.append(","); - } - kylinDependency.append(mrLibDir); + if (kylinDependency.length() > 0) { + kylinDependency.append(","); + } + kylinDependency.append(mrLibDir); } setJobTmpJarsAndFiles(job, kylinDependency.toString()); @@ -296,7 +295,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { try { Configuration jobConf = job.getConfiguration(); - FileSystem fs = FileSystem.getLocal(jobConf); + FileSystem localfs = FileSystem.getLocal(jobConf); FileSystem hdfs = FileSystem.get(jobConf); StringBuilder jarList = new StringBuilder(); @@ -304,20 +303,29 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { for (String fileName : fNameList) { Path p = new Path(fileName); - FileSystem current = (fileName.startsWith(HdfsConstants.HDFS_URI_SCHEME) ? hdfs : fs); - if(!current.exists(p)) { - logger.warn("The directory '" + fileName + "for kylin dependency does not exist!!!"); + if (p.isAbsolute() == false) { + logger.warn("The directory of kylin dependency '" + fileName + "' is not absolute, skip"); + continue; + } + FileSystem fs; + if (hdfs.exists(p)) { + fs = hdfs; + } else if (localfs.exists(p)) { + fs = localfs; + } else { + logger.warn("The directory of kylin dependency '" + fileName + "' does not exist, skip"); continue; } - if (current.getFileStatus(p).isDirectory()) { - appendTmpDir(job, current, fileName); + + if (fs.getFileStatus(p).isDirectory()) { + appendTmpDir(job, fs, p); continue; } StringBuilder list = (p.getName().endsWith(".jar")) ? jarList : fileList; if (list.length() > 0) list.append(","); - list.append(current.getFileStatus(p).getPath()); + list.append(fs.getFileStatus(p).getPath()); } appendTmpFiles(fileList.toString(), jobConf); @@ -327,13 +335,10 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { } } - private void appendTmpDir(Job job, FileSystem fs, String tmpDir) { - if (StringUtils.isBlank(tmpDir)) - return; - + private void appendTmpDir(Job job, FileSystem fs, Path tmpDir) { try { Configuration jobConf = job.getConfiguration(); - FileStatus[] fList = fs.listStatus(new Path(tmpDir)); + FileStatus[] fList = fs.listStatus(tmpDir); StringBuilder jarList = new StringBuilder(); StringBuilder fileList = new StringBuilder(); @@ -341,7 +346,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { for (FileStatus file : fList) { Path p = file.getPath(); if (fs.getFileStatus(p).isDirectory()) { - appendTmpDir(job, fs, p.toString()); + appendTmpDir(job, fs, p); continue; } @@ -624,4 +629,3 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { } } -