This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f0346b  KYLIN-3647 Fix inconsistent states of job and its sub-task
0f0346b is described below

commit 0f0346be2dfbb753ccfb9db698c5f46d6544a190
Author: Ni Chunen <chunen...@kyligence.io>
AuthorDate: Fri Oct 19 14:25:10 2018 +0800

    KYLIN-3647 Fix inconsistent states of job and its sub-task
---
 .../kylin/job/execution/AbstractExecutable.java    | 20 +++++------
 .../job/execution/DefaultChainedExecutable.java    | 34 ++-----------------
 .../kylin/job/execution/ExecutableManager.java     | 26 +++++++++++++--
 .../job/impl/threadpool/DefaultScheduler.java      |  4 +++
 .../kylin/job/PersistExceptionExecutable.java      | 39 ++++++++++++++++++++++
 .../job/impl/threadpool/DefaultSchedulerTest.java  |  2 +-
 6 files changed, 81 insertions(+), 44 deletions(-)

diff --git 
a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java 
b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index ad22abc..b8d3144 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -99,7 +99,7 @@ public abstract class AbstractExecutable implements 
Executable, Idempotent {
                 onExecuteFinished(result, executableContext);
             } catch (Exception e) {
                 logger.error(nRetry + "th retries for onExecuteFinished fails 
due to {}", e);
-                if (isMetaDataPersistException(e)) {
+                if (isMetaDataPersistException(e, 5)) {
                     exception = e;
                     try {
                         Thread.sleep(1000L * (long) Math.pow(4, nRetry));
@@ -211,14 +211,21 @@ public abstract class AbstractExecutable implements 
Executable, Idempotent {
         new MailService(context.getConfig()).sendMail(users, title, content);
     }
 
-    private boolean isMetaDataPersistException(Exception e) {
+    protected abstract ExecuteResult doWork(ExecutableContext context) throws 
ExecuteException, PersistentException;
+
+    @Override
+    public void cleanup() throws ExecuteException {
+
+    }
+
+    public static boolean isMetaDataPersistException(Exception e, final int 
maxDepth) {
         if (e instanceof PersistentException) {
             return true;
         }
 
         Throwable t = e.getCause();
         int depth = 0;
-        while (t != null && depth < 5) {
+        while (t != null && depth < maxDepth) {
             depth++;
             if (t instanceof PersistentException) {
                 return true;
@@ -228,13 +235,6 @@ public abstract class AbstractExecutable implements 
Executable, Idempotent {
         return false;
     }
 
-    protected abstract ExecuteResult doWork(ExecutableContext context) throws 
ExecuteException;
-
-    @Override
-    public void cleanup() throws ExecuteException {
-
-    }
-
     @Override
     public boolean isRunnable() {
         return this.getStatus() == ExecutableState.READY;
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
 
b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index ec660fd..b912ecc 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@ -112,9 +112,9 @@ public class DefaultChainedExecutable extends 
AbstractExecutable implements Chai
                             "There shouldn't be a running subtask[jobId: {}, 
jobName: {}], \n"
                                     + "it might cause endless state, will 
retry to fetch subtask's state.",
                             task.getId(), task.getName());
-                    boolean retryRet = retryFetchTaskStatus(task);
-                    if (false == retryRet)
-                        hasError = true;
+                    getManager().updateJobOutput(task.getId(), 
ExecutableState.ERROR, null,
+                            "killed due to inconsistent state");
+                    hasError = true;
                 }
 
                 final ExecutableState status = task.getStatus();
@@ -175,34 +175,6 @@ public class DefaultChainedExecutable extends 
AbstractExecutable implements Chai
         this.subTasks.add(executable);
     }
 
-    private boolean retryFetchTaskStatus(Executable task) {
-        boolean hasRunning = false;
-        int retry = 1;
-        while (retry <= 10) {
-            ExecutableState retryState = task.getStatus();
-            if (retryState == ExecutableState.RUNNING) {
-                try {
-                    Thread.sleep(100);
-                } catch (InterruptedException e) {
-                    logger.error("Failed to Sleep: ", e);
-                }
-                hasRunning = true;
-                logger.error("With {} times retry, it's state is still 
RUNNING", retry);
-            } else {
-                logger.info("With {} times retry, status is changed to: {}", 
retry, retryState);
-                hasRunning = false;
-                break;
-            }
-            retry++;
-        }
-        if (hasRunning) {
-            logger.error("Parent task: {} is finished, but it's subtask: {}'s 
state is still RUNNING \n"
-                    + ", mark parent task failed.", getName(), task.getName());
-            return false;
-        }
-        return true;
-    }
-
     @Override
     public int getDefaultPriority() {
         return DEFAULT_PRIORITY;
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java 
b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index 5cc8a0f..09b7b8e 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -442,7 +442,6 @@ public class ExecutableManager {
     public void forceKillJob(String jobId) {
         try {
             final ExecutableOutputPO jobOutput = 
executableDao.getJobOutput(jobId);
-            jobOutput.setStatus(ExecutableState.ERROR.toString());
             List<ExecutablePO> tasks = executableDao.getJob(jobId).getTasks();
 
             for (ExecutablePO task : tasks) {
@@ -453,12 +452,31 @@ public class ExecutableManager {
                 }
                 break;
             }
-            executableDao.updateJobOutput(jobOutput);
+
+            if 
(!jobOutput.getStatus().equals(ExecutableState.ERROR.toString())) {
+                jobOutput.setStatus(ExecutableState.ERROR.toString());
+                executableDao.updateJobOutput(jobOutput);
+            }
         } catch (PersistentException e) {
             throw new RuntimeException(e);
         }
     }
 
+    public void forceKillJobWithRetry(String jobId) {
+        boolean done = false;
+
+        while (!done) {
+            try {
+                forceKillJob(jobId);
+                done = true;
+            } catch (RuntimeException e) {
+                if (!(e.getCause() instanceof PersistentException)) {
+                    done = true;
+                }
+            }
+        }
+    }
+
     //for migration only
     //TODO delete when migration finished
     public void resetJobOutput(String jobId, ExecutableState state, String 
output) {
@@ -475,6 +493,10 @@ public class ExecutableManager {
     }
 
     public void addJobInfo(String id, Map<String, String> info) {
+        if (Thread.currentThread().isInterrupted()) {
+            throw new RuntimeException("Current thread is interrupted, 
aborting");
+        }
+
         if (info == null) {
             return;
         }
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index 5dd2c7c..3be8cc7 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -115,6 +115,10 @@ public class DefaultScheduler implements 
Scheduler<AbstractExecutable>, Connecti
             } catch (ExecuteException e) {
                 logger.error("ExecuteException job:" + executable.getId(), e);
             } catch (Exception e) {
+                if (AbstractExecutable.isMetaDataPersistException(e, 5)) {
+                    // Job fail due to PersistException
+                    
ExecutableManager.getInstance(jobEngineConfig.getConfig()).forceKillJobWithRetry(executable.getId());
+                }
                 logger.error("unknown error execute job:" + 
executable.getId(), e);
             } finally {
                 context.removeRunningJob(executable);
diff --git 
a/core-job/src/test/java/org/apache/kylin/job/PersistExceptionExecutable.java 
b/core-job/src/test/java/org/apache/kylin/job/PersistExceptionExecutable.java
new file mode 100644
index 0000000..78b393c
--- /dev/null
+++ 
b/core-job/src/test/java/org/apache/kylin/job/PersistExceptionExecutable.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import org.apache.kylin.job.exception.PersistentException;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+
+public class PersistExceptionExecutable extends BaseTestExecutable {
+    public PersistExceptionExecutable() {
+        super();
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws 
PersistentException {
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+        }
+
+        throw new PersistentException("persistent exception");
+    }
+}
diff --git 
a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
 
b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
index 9d4c575..544a5c4 100644
--- 
a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
+++ 
b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
@@ -150,7 +150,7 @@ public class DefaultSchedulerTest extends BaseSchedulerTest 
{
         waitForJobFinish(job.getId(), 10000);
         Assert.assertEquals(ExecutableState.ERROR, 
execMgr.getOutput(job.getId()).getState());
         Assert.assertEquals(ExecutableState.SUCCEED, 
execMgr.getOutput(task1.getId()).getState());
-        Assert.assertEquals(ExecutableState.RUNNING, 
execMgr.getOutput(task2.getId()).getState());
+        Assert.assertEquals(ExecutableState.ERROR, 
execMgr.getOutput(task2.getId()).getState());
     }
 
     @SuppressWarnings("rawtypes")

Reply via email to