Repository: kylin
Updated Branches:
  refs/heads/master 6914f3f40 -> 53aafa97e


KYLIN-2655 There are some minor problems with the duration of the job when 
resuming the error job or stopped job.

Signed-off-by: Billy Liu <billy...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/53aafa97
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/53aafa97
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/53aafa97

Branch: refs/heads/master
Commit: 53aafa97e150d8ac9dd4dfc54d6b55b12a70240c
Parents: 6914f3f
Author: 10069681 <peng.jian...@zte.com.cn>
Authored: Thu Jun 1 20:11:01 2017 +0800
Committer: Billy Liu <billy...@apache.org>
Committed: Fri Jun 2 09:23:57 2017 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/job/JobInstance.java  | 16 +++++++++++++++
 .../kylin/job/execution/AbstractExecutable.java | 21 ++++++++++++++++----
 .../job/execution/DefaultChainedExecutable.java | 12 +++++++++--
 .../engine/mr/common/JobInfoConverter.java      |  3 ++-
 .../apache/kylin/tool/JobInstanceExtractor.java |  3 ++-
 5 files changed, 47 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/53aafa97/core-job/src/main/java/org/apache/kylin/job/JobInstance.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/JobInstance.java 
b/core-job/src/main/java/org/apache/kylin/job/JobInstance.java
index 8dcdff6..3778834 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JobInstance.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JobInstance.java
@@ -58,6 +58,8 @@ public class JobInstance extends RootPersistentEntity 
implements Comparable<JobI
     private long execStartTime;
     @JsonProperty("exec_end_time")
     private long execEndTime;
+    @JsonProperty("exec_interrupt_time")
+    private long execInterruptTime;
     @JsonProperty("mr_waiting")
     private long mrWaiting = 0;
     @JsonManagedReference
@@ -203,6 +205,20 @@ public class JobInstance extends RootPersistentEntity 
implements Comparable<JobI
     }
 
     /**
+     * @return the execInterruptTime
+     */
+    public long getExecInterruptTime() {
+        return execInterruptTime;
+    }
+
+    /**
+     * @param execInterruptTime the execInterruptTime to set
+     */
+    public void setExecInterruptTime(long execInterruptTime) {
+        this.execInterruptTime = execInterruptTime;
+    }
+
+    /**
      * @param execEndTime the execEndTime to set
      */
     public void setExecEndTime(long execEndTime) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/53aafa97/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java 
b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 1f1be41..d36f598 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -48,6 +48,7 @@ public abstract class AbstractExecutable implements 
Executable, Idempotent {
     protected static final String NOTIFY_LIST = "notify_list";
     protected static final String START_TIME = "startTime";
     protected static final String END_TIME = "endTime";
+    protected static final String INTERRUPT_TIME = "interruptTime";
 
     protected static final Logger logger = 
LoggerFactory.getLogger(AbstractExecutable.class);
     protected int retry = 0;
@@ -322,15 +323,19 @@ public abstract class AbstractExecutable implements 
Executable, Idempotent {
     public static long getEndTime(Output output) {
         return getExtraInfoAsLong(output, END_TIME, 0L);
     }
+    
+    public static long getInterruptTime(Output output) {
+        return getExtraInfoAsLong(output, INTERRUPT_TIME, 0L);
+    }
 
-    public static long getDuration(long startTime, long endTime) {
+    public static long getDuration(long startTime, long endTime, long 
interruptTime) {
         if (startTime == 0) {
             return 0;
         }
         if (endTime == 0) {
-            return System.currentTimeMillis() - startTime;
+            return System.currentTimeMillis() - startTime - interruptTime;
         } else {
-            return endTime - startTime;
+            return endTime - startTime - interruptTime;
         }
     }
 
@@ -359,6 +364,10 @@ public abstract class AbstractExecutable implements 
Executable, Idempotent {
         addExtraInfo(END_TIME, time + "");
     }
 
+    public final void setInterruptTime(long time) {
+        addExtraInfo(INTERRUPT_TIME, time + "");
+    }
+
     public final long getStartTime() {
         return getExtraInfoAsLong(START_TIME, 0L);
     }
@@ -367,8 +376,12 @@ public abstract class AbstractExecutable implements 
Executable, Idempotent {
         return getExtraInfoAsLong(END_TIME, 0L);
     }
 
+    public final long getInterruptTime() {
+        return getExtraInfoAsLong(INTERRUPT_TIME, 0L);
+    }
+
     public final long getDuration() {
-        return getDuration(getStartTime(), getEndTime());
+        return getDuration(getStartTime(), getEndTime(), getInterruptTime());
     }
 
     /*

http://git-wip-us.apache.org/repos/asf/kylin/blob/53aafa97/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
 
b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index 8bcaaad..ae129ab 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@ -70,11 +70,19 @@ public class DefaultChainedExecutable extends 
AbstractExecutable implements Chai
     @Override
     protected void onExecuteStart(ExecutableContext executableContext) {
         Map<String, String> info = Maps.newHashMap();
-        info.put(START_TIME, Long.toString(System.currentTimeMillis()));
         final long startTime = getStartTime();
         if (startTime > 0) {
-            getManager().updateJobOutput(getId(), ExecutableState.RUNNING, 
null, null);
+            final long endTime = getEndTime();
+            if (endTime > 0) {
+                long interruptTime = System.currentTimeMillis() - endTime + 
getInterruptTime();
+                info.put(START_TIME, Long.toString(startTime));
+                info.put(INTERRUPT_TIME, Long.toString(interruptTime));
+                getManager().updateJobOutput(getId(), ExecutableState.RUNNING, 
info, null);
+            } else {
+                getManager().updateJobOutput(getId(), ExecutableState.RUNNING, 
null, null);
+            }
         } else {
+            info.put(START_TIME, Long.toString(System.currentTimeMillis()));
             getManager().updateJobOutput(getId(), ExecutableState.RUNNING, 
info, null);
         }
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/53aafa97/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
index 189e019..c465e3f 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
@@ -53,7 +53,8 @@ public class JobInfoConverter {
         result.setMrWaiting(AbstractExecutable.getExtraInfoAsLong(output, 
CubingJob.MAP_REDUCE_WAIT_TIME, 0L) / 1000);
         result.setExecStartTime(AbstractExecutable.getStartTime(output));
         result.setExecEndTime(AbstractExecutable.getEndTime(output));
-        
result.setDuration(AbstractExecutable.getDuration(result.getExecStartTime(), 
result.getExecEndTime()) / 1000);
+        
result.setExecInterruptTime(AbstractExecutable.getInterruptTime(output));
+        
result.setDuration(AbstractExecutable.getDuration(result.getExecStartTime(), 
result.getExecEndTime(), result.getExecInterruptTime()) / 1000);
         for (int i = 0; i < cubeJob.getTasks().size(); ++i) {
             AbstractExecutable task = cubeJob.getTasks().get(i);
             result.addStep(parseToJobStep(task, i, outputs.get(task.getId())));

http://git-wip-us.apache.org/repos/asf/kylin/blob/53aafa97/tool/src/main/java/org/apache/kylin/tool/JobInstanceExtractor.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/JobInstanceExtractor.java 
b/tool/src/main/java/org/apache/kylin/tool/JobInstanceExtractor.java
index 068dbda..52fd0a0 100644
--- a/tool/src/main/java/org/apache/kylin/tool/JobInstanceExtractor.java
+++ b/tool/src/main/java/org/apache/kylin/tool/JobInstanceExtractor.java
@@ -135,7 +135,8 @@ public class JobInstanceExtractor extends 
AbstractInfoExtractor {
         result.setMrWaiting(AbstractExecutable.getExtraInfoAsLong(output, 
CubingJob.MAP_REDUCE_WAIT_TIME, 0L) / 1000);
         result.setExecStartTime(AbstractExecutable.getStartTime(output));
         result.setExecEndTime(AbstractExecutable.getEndTime(output));
-        
result.setDuration(AbstractExecutable.getDuration(AbstractExecutable.getStartTime(output),
 AbstractExecutable.getEndTime(output)) / 1000);
+        
result.setExecInterruptTime(AbstractExecutable.getInterruptTime(output));
+        
result.setDuration(AbstractExecutable.getDuration(AbstractExecutable.getStartTime(output),
 AbstractExecutable.getEndTime(output), 
AbstractExecutable.getInterruptTime(output)) / 1000);
         for (int i = 0; i < cubeJob.getTasks().size(); ++i) {
             AbstractExecutable task = cubeJob.getTasks().get(i);
             result.addStep(parseToJobStep(task, i, outputs.get(task.getId())));

Reply via email to