This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch 2.5.0-hadoop3.1 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit c06e0a9df7ac3e0100c793928869c9c65ec6dde2 Author: chao long <wayn...@qq.com> AuthorDate: Tue Sep 11 17:47:56 2018 +0800 KYLIN-3551 Check spark counter output path exists --- .../apache/kylin/engine/spark/SparkExecutable.java | 28 ++++++++++++---------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java index 9862dc4..bc7df77 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java @@ -30,6 +30,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.Shell; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.KylinConfigExt; @@ -310,24 +311,25 @@ public class SparkExecutable extends AbstractExecutable { throw new IllegalStateException(); } - // done, update all properties - Map<String, String> joblogInfo = patternedLogger.getInfo(); - - // read counter from hdfs - String counterOutput = getParam(BatchConstants.ARG_COUNTER_OUPUT); - if (counterOutput != null){ - Map<String, String> counterMap = HadoopUtil.readFromSequenceFile(counterOutput); - joblogInfo.putAll(counterMap); - } - - readCounters(joblogInfo); - getManager().addJobInfo(getId(), joblogInfo); if (result == null) { result = future.get(); } - if (result != null && result.getFirst() == 0) { + // done, update all properties + Map<String, String> joblogInfo = patternedLogger.getInfo(); + // read counter from hdfs + String counterOutput = getParam(BatchConstants.ARG_COUNTER_OUPUT); + if (counterOutput != null) { + if (HadoopUtil.getWorkingFileSystem().exists(new Path(counterOutput))) { + Map<String, String> counterMap = HadoopUtil.readFromSequenceFile(counterOutput); + joblogInfo.putAll(counterMap); + } else { + logger.warn("Spark counter output path not exists"); + } + } + readCounters(joblogInfo); + getManager().addJobInfo(getId(), joblogInfo); return new ExecuteResult(ExecuteResult.State.SUCCEED, patternedLogger.getBufferedLog()); }