This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch 2.5.0-hadoop3.1
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit c06e0a9df7ac3e0100c793928869c9c65ec6dde2
Author: chao long <wayn...@qq.com>
AuthorDate: Tue Sep 11 17:47:56 2018 +0800

    KYLIN-3551 Check spark counter output path exists
---
 .../apache/kylin/engine/spark/SparkExecutable.java | 28 ++++++++++++----------
 1 file changed, 15 insertions(+), 13 deletions(-)

diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 9862dc4..bc7df77 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -30,6 +30,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.Shell;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinConfigExt;
@@ -310,24 +311,25 @@ public class SparkExecutable extends AbstractExecutable {
 
                     throw new IllegalStateException();
                 }
-                // done, update all properties
-                Map<String, String> joblogInfo = patternedLogger.getInfo();
-
-                // read counter from hdfs
-                String counterOutput = 
getParam(BatchConstants.ARG_COUNTER_OUPUT);
-                if (counterOutput != null){
-                    Map<String, String> counterMap = 
HadoopUtil.readFromSequenceFile(counterOutput);
-                    joblogInfo.putAll(counterMap);
-                }
-
-                readCounters(joblogInfo);
-                getManager().addJobInfo(getId(), joblogInfo);
 
                 if (result == null) {
                     result = future.get();
                 }
-
                 if (result != null && result.getFirst() == 0) {
+                    // done, update all properties
+                    Map<String, String> joblogInfo = patternedLogger.getInfo();
+                    // read counter from hdfs
+                    String counterOutput = 
getParam(BatchConstants.ARG_COUNTER_OUPUT);
+                    if (counterOutput != null) {
+                        if (HadoopUtil.getWorkingFileSystem().exists(new 
Path(counterOutput))) {
+                            Map<String, String> counterMap = 
HadoopUtil.readFromSequenceFile(counterOutput);
+                            joblogInfo.putAll(counterMap);
+                        } else {
+                            logger.warn("Spark counter output path not 
exists");
+                        }
+                    }
+                    readCounters(joblogInfo);
+                    getManager().addJobInfo(getId(), joblogInfo);
                     return new ExecuteResult(ExecuteResult.State.SUCCEED, 
patternedLogger.getBufferedLog());
                 }
 

Reply via email to