KYLIN-2169 bug fix
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0c6aa760 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0c6aa760 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0c6aa760 Branch: refs/heads/KYLIN-2006 Commit: 0c6aa760e81a30b7df08c9c2015af9f03c253567 Parents: e7a20a0 Author: Yang Li <liy...@apache.org> Authored: Tue Nov 8 23:13:57 2016 +0800 Committer: Yang Li <liy...@apache.org> Committed: Tue Nov 8 23:13:57 2016 +0800 ---------------------------------------------------------------------- .../kylin/job/execution/AbstractExecutable.java | 12 +++++------- .../job/execution/DefaultChainedExecutable.java | 20 ++++++++++---------- 2 files changed, 15 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/0c6aa760/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java index f7b8a7c..2a4b2df 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java @@ -248,14 +248,13 @@ public abstract class AbstractExecutable implements Executable, Idempotent { protected final void notifyUserStatusChange(ExecutableContext context, ExecutableState state) { try { - final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - List<String> users = getAllNofifyUsers(kylinConfig); + List<String> users = getAllNofifyUsers(config); if (users.isEmpty()) { logger.warn("no need to send email, user list is empty"); return; } final Pair<String, String> email = formatNotifications(context, state); - doSendMail(kylinConfig, users, email); + doSendMail(config, users, email); } catch (Exception e) { logger.error("error send email", e); } @@ -287,13 +286,12 @@ public abstract class AbstractExecutable implements Executable, Idempotent { protected void sendMail(Pair<String, String> email) { try { - final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - List<String> users = getAllNofifyUsers(kylinConfig); + List<String> users = getAllNofifyUsers(config); if (users.isEmpty()) { logger.warn("no need to send email, user list is empty"); return; } - doSendMail(kylinConfig, users, email); + doSendMail(config, users, email); } catch (Exception e) { logger.error("error send email", e); } @@ -378,7 +376,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent { } protected boolean needRetry() { - return this.retry <= KylinConfig.getInstanceFromEnv().getJobRetry(); + return this.retry <= config.getJobRetry(); } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/0c6aa760/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java index edc8189..621d51d 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java @@ -33,8 +33,6 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai private final List<AbstractExecutable> subTasks = Lists.newArrayList(); - protected final ExecutableManager jobService = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv()); - public DefaultChainedExecutable() { super(); } @@ -65,9 +63,9 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai info.put(START_TIME, Long.toString(System.currentTimeMillis())); final long startTime = getStartTime(); if (startTime > 0) { - jobService.updateJobOutput(getId(), ExecutableState.RUNNING, null, null); + getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null); } else { - jobService.updateJobOutput(getId(), ExecutableState.RUNNING, info, null); + getManager().updateJobOutput(getId(), ExecutableState.RUNNING, info, null); } } @@ -79,6 +77,8 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai @Override protected void onExecuteFinished(ExecuteResult result, ExecutableContext executableContext) { + ExecutableManager mgr = getManager(); + if (isDiscarded()) { setEndTime(System.currentTimeMillis()); notifyUserStatusChange(executableContext, ExecutableState.DISCARDED); @@ -105,22 +105,22 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai } if (allSucceed) { setEndTime(System.currentTimeMillis()); - jobService.updateJobOutput(getId(), ExecutableState.SUCCEED, null, null); + mgr.updateJobOutput(getId(), ExecutableState.SUCCEED, null, null); notifyUserStatusChange(executableContext, ExecutableState.SUCCEED); } else if (hasError) { setEndTime(System.currentTimeMillis()); - jobService.updateJobOutput(getId(), ExecutableState.ERROR, null, null); + mgr.updateJobOutput(getId(), ExecutableState.ERROR, null, null); notifyUserStatusChange(executableContext, ExecutableState.ERROR); } else if (hasRunning) { - jobService.updateJobOutput(getId(), ExecutableState.RUNNING, null, null); + mgr.updateJobOutput(getId(), ExecutableState.RUNNING, null, null); } else if (hasDiscarded) { - jobService.updateJobOutput(getId(), ExecutableState.DISCARDED, null, null); + mgr.updateJobOutput(getId(), ExecutableState.DISCARDED, null, null); } else { - jobService.updateJobOutput(getId(), ExecutableState.READY, null, null); + mgr.updateJobOutput(getId(), ExecutableState.READY, null, null); } } else { setEndTime(System.currentTimeMillis()); - jobService.updateJobOutput(getId(), ExecutableState.ERROR, null, result.output()); + mgr.updateJobOutput(getId(), ExecutableState.ERROR, null, result.output()); notifyUserStatusChange(executableContext, ExecutableState.ERROR); } }