This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch 2.5.0-hadoop3.1 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 66e55ddc6432996ba7a334f36c9377b8d24d4c4f Author: shaofengshi <shaofeng...@apache.org> AuthorDate: Tue Sep 11 22:25:46 2018 +0800 KYLIN-3554 Kylin move to next step on Spark job resumed --- .../java/org/apache/kylin/engine/spark/SparkExecutable.java | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java index bc7df77..a354909 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java @@ -189,12 +189,14 @@ public class SparkExecutable extends AbstractExecutable { } + @SuppressWarnings("checkstyle:methodlength") @Override protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { ExecutableManager mgr = getManager(); Map<String, String> extra = mgr.getOutput(getId()).getExtra(); - if (extra.containsKey(ExecutableConstants.SPARK_JOB_ID)) { - return onResumed(extra.get(ExecutableConstants.SPARK_JOB_ID), mgr); + String sparkJobId = extra.get(ExecutableConstants.SPARK_JOB_ID); + if (!StringUtils.isEmpty(sparkJobId)) { + return onResumed(sparkJobId, mgr); } else { String cubeName = this.getParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt()); CubeInstance cube = CubeManager.getInstance(context.getConfig()).getCube(cubeName); @@ -325,14 +327,17 @@ public class SparkExecutable extends AbstractExecutable { Map<String, String> counterMap = HadoopUtil.readFromSequenceFile(counterOutput); joblogInfo.putAll(counterMap); } else { - logger.warn("Spark counter output path not exists"); + logger.warn("Spark counter output path not exists: " + counterOutput); } } readCounters(joblogInfo); getManager().addJobInfo(getId(), joblogInfo); return new ExecuteResult(ExecuteResult.State.SUCCEED, patternedLogger.getBufferedLog()); } - + // clear SPARK_JOB_ID on job failure. + extra = mgr.getOutput(getId()).getExtra(); + extra.put(ExecutableConstants.SPARK_JOB_ID, ""); + getManager().addJobInfo(getId(), extra); return new ExecuteResult(ExecuteResult.State.ERROR, result != null ? result.getSecond() : ""); } catch (Exception e) { logger.error("error run spark job:", e);