This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new 68bffc1 KYLIN-3727 Check if where is no directories, then finish job successfully. 68bffc1 is described below commit 68bffc14a8770fa7e1cf5dbad2c027eb027f7be2 Author: alexandr.sidorchuk <alexandr.sidorc...@apm-consult.com> AuthorDate: Wed Feb 27 22:21:26 2019 +0300 KYLIN-3727 Check if where is no directories, then finish job successfully. --- .../kylin/storage/hbase/steps/BulkLoadJob.java | 29 +++++++++++++++++++--- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java index f0b77aa..998c1a1 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java @@ -22,9 +22,13 @@ import java.io.IOException; import org.apache.commons.cli.Options; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsShell; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.util.ToolRunner; +import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.engine.mr.MRUtil; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.storage.hbase.HBaseConnection; @@ -74,10 +78,27 @@ public class BulkLoadJob extends AbstractHadoopJob { newArgs[0] = input; newArgs[1] = tableName; - logger.debug("Start to run LoadIncrementalHFiles"); - int ret = MRUtil.runMRJob(new LoadIncrementalHFiles(conf), newArgs); - logger.debug("End to run LoadIncrementalHFiles"); - return ret; + int count = 0; + Path inputPath = new Path(input); + FileSystem fs = HadoopUtil.getFileSystem(inputPath); + FileStatus[] fileStatuses = fs.listStatus(inputPath); + for(FileStatus fileStatus: fileStatuses) { + if(fileStatus.isDirectory()) { + count++; + break; + } + } + + int ret = 0; + if (count > 0) { + logger.debug("Start to run LoadIncrementalHFiles"); + ret = MRUtil.runMRJob(new LoadIncrementalHFiles(conf), newArgs); + logger.debug("End to run LoadIncrementalHFiles"); + return ret; + } else { + logger.debug("Nothing to load, cube is empty"); + return ret; + } } public static void main(String[] args) throws Exception {