KYLIN-1839, support kylin lib in HDFS Signed-off-by: terry <hzfen...@corp.netease.com> Signed-off-by: Yang Li <liy...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4473d710 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4473d710 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4473d710 Branch: refs/heads/master-hbase1.x Commit: 4473d71011cc0e652eccf4f80269828caa5d3c73 Parents: d28835f Author: terry <hzfen...@corp.netease.com> Authored: Tue Oct 11 17:33:45 2016 +0800 Committer: Yang Li <liy...@apache.org> Committed: Wed Oct 19 07:30:13 2016 +0800 ---------------------------------------------------------------------- .../engine/mr/common/AbstractHadoopJob.java | 29 +++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4473d710/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java index a138eec..bbb1711 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java @@ -46,6 +46,7 @@ import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; @@ -252,14 +253,10 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { // for KylinJobMRLibDir String mrLibDir = kylinConf.getKylinJobMRLibDir(); if (!StringUtils.isBlank(mrLibDir)) { - File dirFileMRLIB = new File(mrLibDir); - if (dirFileMRLIB.exists()) { - if (kylinDependency.length() > 0) - kylinDependency.append(","); + if(kylinDependency.length() > 0) { + kylinDependency.append(","); + } kylinDependency.append(mrLibDir); - } else { - logger.info("The directory '" + mrLibDir + "' for 'kylin.job.mr.lib.dir' does not exist!!!"); - } } setJobTmpJarsAndFiles(job, kylinDependency.toString()); @@ -300,21 +297,27 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { try { Configuration jobConf = job.getConfiguration(); FileSystem fs = FileSystem.getLocal(jobConf); + FileSystem hdfs = FileSystem.get(jobConf); StringBuilder jarList = new StringBuilder(); StringBuilder fileList = new StringBuilder(); for (String fileName : fNameList) { Path p = new Path(fileName); - if (fs.getFileStatus(p).isDirectory()) { - appendTmpDir(job, fileName); + FileSystem current = (fileName.startsWith(HdfsConstants.HDFS_URI_SCHEME) ? hdfs : fs); + if(!current.exists(p)) { + logger.warn("The directory '" + fileName + "for kylin dependency does not exist!!!"); + continue; + } + if (current.getFileStatus(p).isDirectory()) { + appendTmpDir(job, current, fileName); continue; } StringBuilder list = (p.getName().endsWith(".jar")) ? jarList : fileList; if (list.length() > 0) list.append(","); - list.append(fs.getFileStatus(p).getPath().toString()); + list.append(current.getFileStatus(p).getPath()); } appendTmpFiles(fileList.toString(), jobConf); @@ -324,13 +327,12 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { } } - private void appendTmpDir(Job job, String tmpDir) { + private void appendTmpDir(Job job, FileSystem fs, String tmpDir) { if (StringUtils.isBlank(tmpDir)) return; try { Configuration jobConf = job.getConfiguration(); - FileSystem fs = FileSystem.getLocal(jobConf); FileStatus[] fList = fs.listStatus(new Path(tmpDir)); StringBuilder jarList = new StringBuilder(); @@ -339,7 +341,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { for (FileStatus file : fList) { Path p = file.getPath(); if (fs.getFileStatus(p).isDirectory()) { - appendTmpDir(job, p.toString()); + appendTmpDir(job, fs, p.toString()); continue; } @@ -622,3 +624,4 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { } } +