Repository: kylin Updated Branches: refs/heads/master 7e213b013 -> 006a5aab0
KYLIN-2913 Enable job retry for configurable exceptions Signed-off-by: lidongsjtu <lid...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/006a5aab Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/006a5aab Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/006a5aab Branch: refs/heads/master Commit: 006a5aab091bddf47d5a1cff19081b2245cd2a31 Parents: 7e213b0 Author: gwang3 <gwa...@ebay.com> Authored: Mon Dec 18 10:03:49 2017 +0800 Committer: lidongsjtu <lid...@apache.org> Committed: Thu Dec 21 23:12:18 2017 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 4 ++ .../kylin/job/execution/AbstractExecutable.java | 36 ++++++++++++-- .../job/execution/DefaultChainedExecutable.java | 5 -- .../kylin/job/execution/ExecuteResult.java | 7 +++ .../kylin/job/RetryableTestExecutable.java | 50 ++++++++++++++++++++ .../apache/kylin/job/SelfStopExecutable.java | 2 +- .../apache/kylin/job/SucceedTestExecutable.java | 2 +- .../impl/threadpool/DefaultSchedulerTest.java | 17 +++++++ .../engine/mr/steps/CopyDictionaryStep.java | 2 +- .../mr/steps/MergeStatisticsWithOldStep.java | 2 +- .../mr/steps/UpdateCubeInfoAfterBuildStep.java | 2 +- .../UpdateCubeInfoAfterCheckpointStep.java | 2 +- .../steps/UpdateCubeInfoAfterOptimizeStep.java | 2 +- .../apache/kylin/job/ContextTestExecutable.java | 2 +- .../kylin/source/kafka/job/MergeOffsetStep.java | 2 +- .../source/kafka/job/UpdateTimeRangeStep.java | 2 +- 16 files changed, 120 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/006a5aab/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index ce524b1..7763457 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -557,6 +557,10 @@ abstract public class KylinConfigBase implements Serializable { return Integer.parseInt(this.getOptional("kylin.job.retry", "0")); } + public String[] getJobRetryExceptions() { + return getOptionalStringArray("kylin.job.retry-exception-classes", new String[0]); + } + public int getCubeStatsHLLPrecision() { return Integer.parseInt(getOptional("kylin.job.sampling-hll-precision", "14")); } http://git-wip-us.apache.org/repos/asf/kylin/blob/006a5aab/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java index a37cdc9..6a0db97 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.UUID; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.MailService; @@ -114,6 +115,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent { Preconditions.checkArgument(executableContext instanceof DefaultContext); ExecuteResult result = null; + try { onExecuteStart(executableContext); Throwable exception; @@ -130,9 +132,10 @@ public abstract class AbstractExecutable implements Executable, Idempotent { exception = e; } retry++; - } while (((result != null && result.succeed() == false) || exception != null) && needRetry() == true); + } while (needRetry(result, exception)); - if (exception != null) { + //check exception in result to avoid retry on ChainedExecutable(only need retry on subtask actually) + if (exception != null || result.getThrowable() != null) { onExecuteError(exception, executableContext); throw new ExecuteException(exception); } @@ -172,6 +175,13 @@ public abstract class AbstractExecutable implements Executable, Idempotent { return false; } + private boolean isRetryableExecutionResult(ExecuteResult result) { + if (result != null && result.getThrowable() != null && isRetrableException(result.getThrowable())) { + return true; + } + return false; + } + protected abstract ExecuteResult doWork(ExecutableContext context) throws ExecuteException; @Override @@ -412,8 +422,26 @@ public abstract class AbstractExecutable implements Executable, Idempotent { return status == ExecutableState.STOPPED; } - protected boolean needRetry() { - return this.retry <= config.getJobRetry(); + protected boolean isRetrableException(Throwable t) { + return ArrayUtils.contains(KylinConfig.getInstanceFromEnv().getJobRetryExceptions(), t.getClass().getName()); + } + + // Retry will happen in below cases: + // 1) if property "kylin.job.retry-exception-classes" is not set or is null, all jobs with exceptions will retry according to the retry times. + // 2) if property "kylin.job.retry-exception-classes" is set and is not null, only jobs with the specified exceptions will retry according to the retry times. + protected boolean needRetry(ExecuteResult result, Throwable e) { + if (this.retry > KylinConfig.getInstanceFromEnv().getJobRetry()) { + return false; + } + String[] retryableEx = KylinConfig.getInstanceFromEnv().getJobRetryExceptions(); + if (retryableEx == null || retryableEx.length == 0) { + return true; + } + if ((result != null && isRetryableExecutionResult(result)) + || e != null && isRetrableException(e)) { + return true; + } + return false; } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/006a5aab/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java index ff8dfee..9e53459 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java @@ -150,11 +150,6 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai return subTasks; } - @Override - protected boolean needRetry() { - return false; - } - public final AbstractExecutable getTaskByName(String name) { for (AbstractExecutable task : subTasks) { if (task.getName() != null && task.getName().equalsIgnoreCase(name)) { http://git-wip-us.apache.org/repos/asf/kylin/blob/006a5aab/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java index c0c9d36..fa24bb0 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java @@ -32,6 +32,13 @@ public final class ExecuteResult { private final String output; private final Throwable throwable; + /** + * Default constructor to indicate a success ExecuteResult. + */ + public ExecuteResult() { + this(State.SUCCEED, "succeed"); + } + public ExecuteResult(State state) { this(state, ""); } http://git-wip-us.apache.org/repos/asf/kylin/blob/006a5aab/core-job/src/test/java/org/apache/kylin/job/RetryableTestExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/test/java/org/apache/kylin/job/RetryableTestExecutable.java b/core-job/src/test/java/org/apache/kylin/job/RetryableTestExecutable.java new file mode 100644 index 0000000..61b1742 --- /dev/null +++ b/core-job/src/test/java/org/apache/kylin/job/RetryableTestExecutable.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.job; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + */ +public class RetryableTestExecutable extends BaseTestExecutable { + private static final Logger logger = LoggerFactory.getLogger(RetryableTestExecutable.class); + + public RetryableTestExecutable() { + super(); + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) { + logger.debug("run retryable exception test. "); + String[] exceptions = KylinConfig.getInstanceFromEnv().getJobRetryExceptions(); + Throwable ex = null; + if (exceptions != null && exceptions[0] != null) { + try { + ex = (Throwable) Class.forName(exceptions[0]).newInstance(); + } catch (Exception e) { + e.printStackTrace(); + } + } + return new ExecuteResult(ExecuteResult.State.ERROR, null, ex); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/006a5aab/core-job/src/test/java/org/apache/kylin/job/SelfStopExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/test/java/org/apache/kylin/job/SelfStopExecutable.java b/core-job/src/test/java/org/apache/kylin/job/SelfStopExecutable.java index 9a3eb48..09f5a7a 100644 --- a/core-job/src/test/java/org/apache/kylin/job/SelfStopExecutable.java +++ b/core-job/src/test/java/org/apache/kylin/job/SelfStopExecutable.java @@ -43,7 +43,7 @@ public class SelfStopExecutable extends BaseTestExecutable { return new ExecuteResult(ExecuteResult.State.STOPPED, "stopped"); } - return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); + return new ExecuteResult(); } finally { doingWork = false; } http://git-wip-us.apache.org/repos/asf/kylin/blob/006a5aab/core-job/src/test/java/org/apache/kylin/job/SucceedTestExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/test/java/org/apache/kylin/job/SucceedTestExecutable.java b/core-job/src/test/java/org/apache/kylin/job/SucceedTestExecutable.java index 1421f10..58ed515 100644 --- a/core-job/src/test/java/org/apache/kylin/job/SucceedTestExecutable.java +++ b/core-job/src/test/java/org/apache/kylin/job/SucceedTestExecutable.java @@ -36,6 +36,6 @@ public class SucceedTestExecutable extends BaseTestExecutable { Thread.sleep(1000); } catch (InterruptedException e) { } - return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); + return new ExecuteResult(); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/006a5aab/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java ---------------------------------------------------------------------- diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java index c8b251d..badd483 100644 --- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java +++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java @@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit; import org.apache.kylin.job.BaseTestExecutable; import org.apache.kylin.job.ErrorTestExecutable; import org.apache.kylin.job.FailedTestExecutable; +import org.apache.kylin.job.RetryableTestExecutable; import org.apache.kylin.job.SelfStopExecutable; import org.apache.kylin.job.SucceedTestExecutable; import org.apache.kylin.job.execution.DefaultChainedExecutable; @@ -145,4 +146,20 @@ public class DefaultSchedulerTest extends BaseSchedulerTest { assertFalse("countDownLatch2 should NOT reach zero in 15 secs", countDownLatch2.await(7, TimeUnit.SECONDS)); assertFalse("future2 should has been stopped", future2.cancel(true)); } + + @Test + public void testRetryableException() throws Exception { + System.setProperty("kylin.job.retry-exception-classes", "java.io.FileNotFoundException"); + System.setProperty("kylin.job.retry", "3"); + DefaultChainedExecutable job = new DefaultChainedExecutable(); + BaseTestExecutable task1 = new SucceedTestExecutable(); + BaseTestExecutable task2 = new RetryableTestExecutable(); + job.addTask(task1); + job.addTask(task2); + jobService.addJob(job); + waitForJobFinish(job.getId()); + Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState()); + Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(task2.getId()).getState()); + Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState()); + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/006a5aab/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java index 3341be9..c0b3c99 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java @@ -65,6 +65,6 @@ public class CopyDictionaryStep extends AbstractExecutable { return ExecuteResult.createError(e); } - return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); + return new ExecuteResult(); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/006a5aab/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java index eca0499..3f12b0d 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java @@ -134,7 +134,7 @@ public class MergeStatisticsWithOldStep extends AbstractExecutable { .getJob(CubingExecutableUtil.getCubingJobId(this.getParams())); StatisticsDecisionUtil.decideCubingAlgorithm(cubingJob, optimizeSegment); - return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); + return new ExecuteResult(); } catch (IOException e) { logger.error("fail to merge cuboid statistics", e); return ExecuteResult.createError(e); http://git-wip-us.apache.org/repos/asf/kylin/blob/006a5aab/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java index 7d36643..beb9357 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java @@ -73,7 +73,7 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable { } cubeManager.promoteNewlyBuiltSegments(cube, segment); - return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); + return new ExecuteResult(); } catch (IOException e) { logger.error("fail to update cube after build", e); return ExecuteResult.createError(e); http://git-wip-us.apache.org/repos/asf/kylin/blob/006a5aab/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterCheckpointStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterCheckpointStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterCheckpointStep.java index ed61b4a..80811be 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterCheckpointStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterCheckpointStep.java @@ -59,7 +59,7 @@ public class UpdateCubeInfoAfterCheckpointStep extends AbstractExecutable { } cubeManager.promoteCheckpointOptimizeSegments(cube, recommendCuboidsWithStats, newSegments.toArray(new CubeSegment[newSegments.size()])); - return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); + return new ExecuteResult(); } catch (Exception e) { logger.error("fail to update cube after build", e); return ExecuteResult.createError(e); http://git-wip-us.apache.org/repos/asf/kylin/blob/006a5aab/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java index 13c4f40..d013386 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java @@ -62,7 +62,7 @@ public class UpdateCubeInfoAfterOptimizeStep extends AbstractExecutable { try { cubeManager.promoteNewlyOptimizeSegments(cube, segment); - return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); + return new ExecuteResult(); } catch (IOException e) { logger.error("fail to update cube after build", e); return ExecuteResult.createError(e); http://git-wip-us.apache.org/repos/asf/kylin/blob/006a5aab/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java b/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java index 4696e67..9b4f299 100644 --- a/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java +++ b/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java @@ -36,7 +36,7 @@ public class ContextTestExecutable extends AbstractExecutable { } catch (InterruptedException e) { } if (context.getConfig() == BaseTestDistributedScheduler.kylinConfig1) { - return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); + return new ExecuteResult(); } else { return new ExecuteResult(ExecuteResult.State.ERROR, "error"); } http://git-wip-us.apache.org/repos/asf/kylin/blob/006a5aab/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java index 8139342..fe5812b 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java @@ -73,7 +73,7 @@ public class MergeOffsetStep extends AbstractExecutable { cubeBuilder.setToUpdateSegs(segment); try { cubeManager.updateCube(cubeBuilder); - return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); + return new ExecuteResult(); } catch (IOException e) { logger.error("fail to update cube segment offset", e); return ExecuteResult.createError(e); http://git-wip-us.apache.org/repos/asf/kylin/blob/006a5aab/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java index 8c31c70..183271d 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java @@ -37,7 +37,7 @@ public class UpdateTimeRangeStep extends AbstractExecutable { @Override protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { - return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); + return new ExecuteResult(); } }