KYLIN-1038 retry on job failure
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3af7d4a7 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3af7d4a7 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3af7d4a7 Branch: refs/heads/1.5.x-HBase1.1.3 Commit: 3af7d4a72c8308f00fe95276b08f05709eaa62e5 Parents: 17c33dc Author: shaofengshi <[email protected]> Authored: Sun Feb 14 21:17:12 2016 +0800 Committer: shaofengshi <[email protected]> Committed: Tue Mar 8 12:13:16 2016 +0800 ---------------------------------------------------------------------- build/conf/kylin.properties | 3 ++ .../apache/kylin/common/KylinConfigBase.java | 4 ++ .../kylin/job/execution/AbstractExecutable.java | 57 +++++++++++++------- .../job/execution/DefaultChainedExecutable.java | 5 ++ 4 files changed, 50 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/3af7d4a7/build/conf/kylin.properties ---------------------------------------------------------------------- diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties index 78a564d..d694e9f 100644 --- a/build/conf/kylin.properties +++ b/build/conf/kylin.properties @@ -23,6 +23,9 @@ kylin.hbase.cluster.fs= kylin.job.mapreduce.default.reduce.input.mb=500 +# max job retry on error, default 0: no retry +kylin.job.retry=0 + # If true, job engine will not assume that hadoop CLI reside on the same server as it self # you will have to specify kylin.job.remote.cli.hostname, kylin.job.remote.cli.username and kylin.job.remote.cli.password # It should not be set to "true" unless you're NOT running Kylin.sh on a hadoop client machine http://git-wip-us.apache.org/repos/asf/kylin/blob/3af7d4a7/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 826a28c..487f78e 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -538,6 +538,10 @@ public class KylinConfigBase implements Serializable { return Boolean.parseBoolean(getOptional("crossdomain.enable", "true")); } + public int getJobRetry() { + return Integer.parseInt(this.getOptional("kylin.job.retry", "0")); + } + public String toString() { return getMetadataUrl(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/3af7d4a7/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java index e1d7106..8d5fea5 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java @@ -18,13 +18,10 @@ package org.apache.kylin.job.execution; -import java.io.PrintWriter; -import java.io.StringWriter; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.UUID; - +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.kylin.common.KylinConfig; @@ -35,10 +32,12 @@ import org.apache.kylin.job.manager.ExecutableManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; /** */ @@ -50,6 +49,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent { protected static final String END_TIME = "endTime"; protected static final Logger logger = LoggerFactory.getLogger(AbstractExecutable.class); + protected int retry = 0; private String name; private String id; @@ -99,15 +99,30 @@ public abstract class AbstractExecutable implements Executable, Idempotent { logger.info("Executing >>>>>>>>>>>>> " + this.getName() + " <<<<<<<<<<<<<"); Preconditions.checkArgument(executableContext instanceof DefaultContext); - ExecuteResult result; - try { - onExecuteStart(executableContext); - result = doWork(executableContext); - } catch (Throwable e) { - logger.error("error running Executable", e); - onExecuteError(e, executableContext); - throw new ExecuteException(e); + ExecuteResult result = null; + + onExecuteStart(executableContext); + Throwable exception; + do { + if (retry > 0) { + logger.info("Retry " + retry); + } + exception = null; + result = null; + try { + result = doWork(executableContext); + } catch (Throwable e) { + logger.error("error running Executable", e); + exception = e; + } + retry++; + } while (((result != null && result.succeed() == false) || exception != null) && needRetry() == true); + + if (exception != null) { + onExecuteError(exception, executableContext); + throw new ExecuteException(exception); } + onExecuteFinished(result, executableContext); return result; } @@ -301,6 +316,10 @@ public abstract class AbstractExecutable implements Executable, Idempotent { return status == ExecutableState.DISCARDED; } + protected boolean needRetry() { + return this.retry <= KylinConfig.getInstanceFromEnv().getJobRetry(); + } + @Override public String toString() { return Objects.toStringHelper(this).add("id", getId()).add("name", getName()).add("state", getStatus()).toString(); http://git-wip-us.apache.org/repos/asf/kylin/blob/3af7d4a7/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java index 2e95711..7403715 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java @@ -106,6 +106,11 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai return subTasks; } + @Override + protected boolean needRetry() { + return false; + } + public final AbstractExecutable getTaskByName(String name) { for (AbstractExecutable task : subTasks) { if (task.getName() != null && task.getName().equalsIgnoreCase(name)) {
