Repository: kylin Updated Branches: refs/heads/master 6914f3f40 -> 53aafa97e
KYLIN-2655 There are some minor problems with the duration of the job when resuming the error job or stopped job. Signed-off-by: Billy Liu <billy...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/53aafa97 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/53aafa97 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/53aafa97 Branch: refs/heads/master Commit: 53aafa97e150d8ac9dd4dfc54d6b55b12a70240c Parents: 6914f3f Author: 10069681 <peng.jian...@zte.com.cn> Authored: Thu Jun 1 20:11:01 2017 +0800 Committer: Billy Liu <billy...@apache.org> Committed: Fri Jun 2 09:23:57 2017 +0800 ---------------------------------------------------------------------- .../java/org/apache/kylin/job/JobInstance.java | 16 +++++++++++++++ .../kylin/job/execution/AbstractExecutable.java | 21 ++++++++++++++++---- .../job/execution/DefaultChainedExecutable.java | 12 +++++++++-- .../engine/mr/common/JobInfoConverter.java | 3 ++- .../apache/kylin/tool/JobInstanceExtractor.java | 3 ++- 5 files changed, 47 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/53aafa97/core-job/src/main/java/org/apache/kylin/job/JobInstance.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/JobInstance.java b/core-job/src/main/java/org/apache/kylin/job/JobInstance.java index 8dcdff6..3778834 100644 --- a/core-job/src/main/java/org/apache/kylin/job/JobInstance.java +++ b/core-job/src/main/java/org/apache/kylin/job/JobInstance.java @@ -58,6 +58,8 @@ public class JobInstance extends RootPersistentEntity implements Comparable<JobI private long execStartTime; @JsonProperty("exec_end_time") private long execEndTime; + @JsonProperty("exec_interrupt_time") + private long execInterruptTime; @JsonProperty("mr_waiting") private long mrWaiting = 0; @JsonManagedReference @@ -203,6 +205,20 @@ public class JobInstance extends RootPersistentEntity implements Comparable<JobI } /** + * @return the execInterruptTime + */ + public long getExecInterruptTime() { + return execInterruptTime; + } + + /** + * @param execInterruptTime the execInterruptTime to set + */ + public void setExecInterruptTime(long execInterruptTime) { + this.execInterruptTime = execInterruptTime; + } + + /** * @param execEndTime the execEndTime to set */ public void setExecEndTime(long execEndTime) { http://git-wip-us.apache.org/repos/asf/kylin/blob/53aafa97/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java index 1f1be41..d36f598 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java @@ -48,6 +48,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent { protected static final String NOTIFY_LIST = "notify_list"; protected static final String START_TIME = "startTime"; protected static final String END_TIME = "endTime"; + protected static final String INTERRUPT_TIME = "interruptTime"; protected static final Logger logger = LoggerFactory.getLogger(AbstractExecutable.class); protected int retry = 0; @@ -322,15 +323,19 @@ public abstract class AbstractExecutable implements Executable, Idempotent { public static long getEndTime(Output output) { return getExtraInfoAsLong(output, END_TIME, 0L); } + + public static long getInterruptTime(Output output) { + return getExtraInfoAsLong(output, INTERRUPT_TIME, 0L); + } - public static long getDuration(long startTime, long endTime) { + public static long getDuration(long startTime, long endTime, long interruptTime) { if (startTime == 0) { return 0; } if (endTime == 0) { - return System.currentTimeMillis() - startTime; + return System.currentTimeMillis() - startTime - interruptTime; } else { - return endTime - startTime; + return endTime - startTime - interruptTime; } } @@ -359,6 +364,10 @@ public abstract class AbstractExecutable implements Executable, Idempotent { addExtraInfo(END_TIME, time + ""); } + public final void setInterruptTime(long time) { + addExtraInfo(INTERRUPT_TIME, time + ""); + } + public final long getStartTime() { return getExtraInfoAsLong(START_TIME, 0L); } @@ -367,8 +376,12 @@ public abstract class AbstractExecutable implements Executable, Idempotent { return getExtraInfoAsLong(END_TIME, 0L); } + public final long getInterruptTime() { + return getExtraInfoAsLong(INTERRUPT_TIME, 0L); + } + public final long getDuration() { - return getDuration(getStartTime(), getEndTime()); + return getDuration(getStartTime(), getEndTime(), getInterruptTime()); } /* http://git-wip-us.apache.org/repos/asf/kylin/blob/53aafa97/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java index 8bcaaad..ae129ab 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java @@ -70,11 +70,19 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai @Override protected void onExecuteStart(ExecutableContext executableContext) { Map<String, String> info = Maps.newHashMap(); - info.put(START_TIME, Long.toString(System.currentTimeMillis())); final long startTime = getStartTime(); if (startTime > 0) { - getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null); + final long endTime = getEndTime(); + if (endTime > 0) { + long interruptTime = System.currentTimeMillis() - endTime + getInterruptTime(); + info.put(START_TIME, Long.toString(startTime)); + info.put(INTERRUPT_TIME, Long.toString(interruptTime)); + getManager().updateJobOutput(getId(), ExecutableState.RUNNING, info, null); + } else { + getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null); + } } else { + info.put(START_TIME, Long.toString(System.currentTimeMillis())); getManager().updateJobOutput(getId(), ExecutableState.RUNNING, info, null); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/53aafa97/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java index 189e019..c465e3f 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java @@ -53,7 +53,8 @@ public class JobInfoConverter { result.setMrWaiting(AbstractExecutable.getExtraInfoAsLong(output, CubingJob.MAP_REDUCE_WAIT_TIME, 0L) / 1000); result.setExecStartTime(AbstractExecutable.getStartTime(output)); result.setExecEndTime(AbstractExecutable.getEndTime(output)); - result.setDuration(AbstractExecutable.getDuration(result.getExecStartTime(), result.getExecEndTime()) / 1000); + result.setExecInterruptTime(AbstractExecutable.getInterruptTime(output)); + result.setDuration(AbstractExecutable.getDuration(result.getExecStartTime(), result.getExecEndTime(), result.getExecInterruptTime()) / 1000); for (int i = 0; i < cubeJob.getTasks().size(); ++i) { AbstractExecutable task = cubeJob.getTasks().get(i); result.addStep(parseToJobStep(task, i, outputs.get(task.getId()))); http://git-wip-us.apache.org/repos/asf/kylin/blob/53aafa97/tool/src/main/java/org/apache/kylin/tool/JobInstanceExtractor.java ---------------------------------------------------------------------- diff --git a/tool/src/main/java/org/apache/kylin/tool/JobInstanceExtractor.java b/tool/src/main/java/org/apache/kylin/tool/JobInstanceExtractor.java index 068dbda..52fd0a0 100644 --- a/tool/src/main/java/org/apache/kylin/tool/JobInstanceExtractor.java +++ b/tool/src/main/java/org/apache/kylin/tool/JobInstanceExtractor.java @@ -135,7 +135,8 @@ public class JobInstanceExtractor extends AbstractInfoExtractor { result.setMrWaiting(AbstractExecutable.getExtraInfoAsLong(output, CubingJob.MAP_REDUCE_WAIT_TIME, 0L) / 1000); result.setExecStartTime(AbstractExecutable.getStartTime(output)); result.setExecEndTime(AbstractExecutable.getEndTime(output)); - result.setDuration(AbstractExecutable.getDuration(AbstractExecutable.getStartTime(output), AbstractExecutable.getEndTime(output)) / 1000); + result.setExecInterruptTime(AbstractExecutable.getInterruptTime(output)); + result.setDuration(AbstractExecutable.getDuration(AbstractExecutable.getStartTime(output), AbstractExecutable.getEndTime(output), AbstractExecutable.getInterruptTime(output)) / 1000); for (int i = 0; i < cubeJob.getTasks().size(); ++i) { AbstractExecutable task = cubeJob.getTasks().get(i); result.addStep(parseToJobStep(task, i, outputs.get(task.getId())));