This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch 2.5.x in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 3e929fe58b8c4b982ad179b2b15d9935378f6bb4 Author: Ni Chunen <chunen...@kyligence.io> AuthorDate: Fri Oct 19 14:25:10 2018 +0800 KYLIN-3647 Fix inconsistent states of job and its sub-task --- .../kylin/job/execution/AbstractExecutable.java | 20 +++++------ .../job/execution/DefaultChainedExecutable.java | 34 ++----------------- .../kylin/job/execution/ExecutableManager.java | 26 +++++++++++++-- .../job/impl/threadpool/DefaultScheduler.java | 4 +++ .../kylin/job/PersistExceptionExecutable.java | 39 ++++++++++++++++++++++ .../job/impl/threadpool/DefaultSchedulerTest.java | 2 +- 6 files changed, 81 insertions(+), 44 deletions(-) diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java index ad22abc..b8d3144 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java @@ -99,7 +99,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent { onExecuteFinished(result, executableContext); } catch (Exception e) { logger.error(nRetry + "th retries for onExecuteFinished fails due to {}", e); - if (isMetaDataPersistException(e)) { + if (isMetaDataPersistException(e, 5)) { exception = e; try { Thread.sleep(1000L * (long) Math.pow(4, nRetry)); @@ -211,14 +211,21 @@ public abstract class AbstractExecutable implements Executable, Idempotent { new MailService(context.getConfig()).sendMail(users, title, content); } - private boolean isMetaDataPersistException(Exception e) { + protected abstract ExecuteResult doWork(ExecutableContext context) throws ExecuteException, PersistentException; + + @Override + public void cleanup() throws ExecuteException { + + } + + public static boolean isMetaDataPersistException(Exception e, final int maxDepth) { if (e instanceof PersistentException) { return true; } Throwable t = e.getCause(); int depth = 0; - while (t != null && depth < 5) { + while (t != null && depth < maxDepth) { depth++; if (t instanceof PersistentException) { return true; @@ -228,13 +235,6 @@ public abstract class AbstractExecutable implements Executable, Idempotent { return false; } - protected abstract ExecuteResult doWork(ExecutableContext context) throws ExecuteException; - - @Override - public void cleanup() throws ExecuteException { - - } - @Override public boolean isRunnable() { return this.getStatus() == ExecutableState.READY; diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java index a8a91fd..b912ecc 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java @@ -112,9 +112,9 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai "There shouldn't be a running subtask[jobId: {}, jobName: {}], \n" + "it might cause endless state, will retry to fetch subtask's state.", task.getId(), task.getName()); - boolean retryRet = retryFetchTaskStatus(task); - if (false == retryRet) - hasError = true; + getManager().updateJobOutput(task.getId(), ExecutableState.ERROR, null, + "killed due to inconsistent state"); + hasError = true; } final ExecutableState status = task.getStatus(); @@ -175,34 +175,6 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai this.subTasks.add(executable); } - private boolean retryFetchTaskStatus(Executable task) { - boolean hasRunning = false; - int retry = 1; - while (retry <= 10) { - ExecutableState retryState = task.getStatus(); - if (retryState == ExecutableState.RUNNING) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - logger.error("Failed to Sleep: ", e); - } - hasRunning = true; - logger.error("With {} times retry, it's state is still RUNNING", retry); - } else { - logger.info("With {} times retry, status is changed to: {}", retry, retryState); - hasRunning = false; - break; - } - retry++; - } - if (hasRunning) { - logger.error("Parent task: {} is finished, but it's subtask: {}'s state is still RUNNING \n" - + ", mark parent task failed.", getName(), task.getName()); - return false; - } - return true; - } - @Override public int getDefaultPriority() { return DEFAULT_PRIORITY; diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java index 788a7fb..45c37b5 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java @@ -441,7 +441,6 @@ public class ExecutableManager { public void forceKillJob(String jobId) { try { final ExecutableOutputPO jobOutput = executableDao.getJobOutput(jobId); - jobOutput.setStatus(ExecutableState.ERROR.toString()); List<ExecutablePO> tasks = executableDao.getJob(jobId).getTasks(); for (ExecutablePO task : tasks) { @@ -452,12 +451,31 @@ public class ExecutableManager { } break; } - executableDao.updateJobOutput(jobOutput); + + if (!jobOutput.getStatus().equals(ExecutableState.ERROR.toString())) { + jobOutput.setStatus(ExecutableState.ERROR.toString()); + executableDao.updateJobOutput(jobOutput); + } } catch (PersistentException e) { throw new RuntimeException(e); } } + public void forceKillJobWithRetry(String jobId) { + boolean done = false; + + while (!done) { + try { + forceKillJob(jobId); + done = true; + } catch (RuntimeException e) { + if (!(e.getCause() instanceof PersistentException)) { + done = true; + } + } + } + } + //for migration only //TODO delete when migration finished public void resetJobOutput(String jobId, ExecutableState state, String output) { @@ -474,6 +492,10 @@ public class ExecutableManager { } public void addJobInfo(String id, Map<String, String> info) { + if (Thread.currentThread().isInterrupted()) { + throw new RuntimeException("Current thread is interrupted, aborting"); + } + if (info == null) { return; } diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java index c566408..6d40be8 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java @@ -114,6 +114,10 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti } catch (ExecuteException e) { logger.error("ExecuteException job:" + executable.getId(), e); } catch (Exception e) { + if (AbstractExecutable.isMetaDataPersistException(e, 5)) { + // Job fail due to PersistException + ExecutableManager.getInstance(jobEngineConfig.getConfig()).forceKillJobWithRetry(executable.getId()); + } logger.error("unknown error execute job:" + executable.getId(), e); } finally { context.removeRunningJob(executable); diff --git a/core-job/src/test/java/org/apache/kylin/job/PersistExceptionExecutable.java b/core-job/src/test/java/org/apache/kylin/job/PersistExceptionExecutable.java new file mode 100644 index 0000000..78b393c --- /dev/null +++ b/core-job/src/test/java/org/apache/kylin/job/PersistExceptionExecutable.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job; + +import org.apache.kylin.job.exception.PersistentException; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; + +public class PersistExceptionExecutable extends BaseTestExecutable { + public PersistExceptionExecutable() { + super(); + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws PersistentException { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + + throw new PersistentException("persistent exception"); + } +} diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java index 9d4c575..544a5c4 100644 --- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java +++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java @@ -150,7 +150,7 @@ public class DefaultSchedulerTest extends BaseSchedulerTest { waitForJobFinish(job.getId(), 10000); Assert.assertEquals(ExecutableState.ERROR, execMgr.getOutput(job.getId()).getState()); Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task1.getId()).getState()); - Assert.assertEquals(ExecutableState.RUNNING, execMgr.getOutput(task2.getId()).getState()); + Assert.assertEquals(ExecutableState.ERROR, execMgr.getOutput(task2.getId()).getState()); } @SuppressWarnings("rawtypes")