This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit ac3459718de13b189523fd272b91a300ad60814f Author: sibingzhang <74443791+sibingzh...@users.noreply.github.com> AuthorDate: Wed Dec 7 14:31:54 2022 +0800 KYLIN-5436 Fix restart build job failed --- .../org/apache/kylin/job/execution/NExecutableManager.java | 1 - .../main/java/org/apache/kylin/job/runners/FetcherRunner.java | 10 +++------- .../src/main/java/org/apache/kylin/job/runners/JobRunner.java | 1 + .../org/apache/kylin/job/execution/NExecutableManagerTest.java | 2 +- .../org/apache/kylin/engine/spark/job/NSparkExecutable.java | 4 +++- 5 files changed, 8 insertions(+), 10 deletions(-) diff --git a/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java b/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java index 5d62003967..512d891320 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java @@ -1401,7 +1401,6 @@ public class NExecutableManager { if (thread != null) { logger.info("Interrupt Job [{}] thread and remove in ExecutableContext", executable.getDisplayName()); thread.interrupt(); - scheduler.getContext().removeRunningJob(executable); } } diff --git a/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java b/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java index 425a228f74..e9e334ecbe 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java @@ -219,7 +219,6 @@ public class FetcherRunner extends AbstractDefaultSchedulerRunner { if (memoryLock) { jobDesc = executable.toString(); logger.info("{} prepare to schedule", jobDesc); - context.addRunningJob(executable); jobPool.execute(new JobRunner(nDefaultScheduler, executable, this)); logger.info("{} scheduled", jobDesc); } else { @@ -227,12 +226,9 @@ public class FetcherRunner extends AbstractDefaultSchedulerRunner { NDefaultScheduler.getMemoryRemaining().availablePermits(), executable.getDisplayName()); } } catch (Exception ex) { - if (executable != null) { - context.removeRunningJob(executable); - if (memoryLock) { - // may release twice when exception raise after jobPool execute executable - NDefaultScheduler.getMemoryRemaining().release(useMemoryCapacity); - } + if (executable != null && memoryLock) { + // may release twice when exception raise after jobPool execute executable + NDefaultScheduler.getMemoryRemaining().release(useMemoryCapacity); } logger.warn("{} fail to schedule", jobDesc, ex); } diff --git a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java index 3e839f57f4..74f4fbf365 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java @@ -50,6 +50,7 @@ public class JobRunner extends AbstractDefaultSchedulerRunner { val jobIdSimple = executable.getId().substring(0, 8); try (SetThreadName ignored = new SetThreadName("JobWorker(project:%s,jobid:%s)", project, jobIdSimple); SetLogCategory logCategory = new SetLogCategory("schedule")) { + context.addRunningJob(executable); executable.execute(context); // trigger the next step asap fetcherRunner.scheduleNext(); diff --git a/src/core-job/src/test/java/org/apache/kylin/job/execution/NExecutableManagerTest.java b/src/core-job/src/test/java/org/apache/kylin/job/execution/NExecutableManagerTest.java index 8aa9bc85b5..8934603053 100644 --- a/src/core-job/src/test/java/org/apache/kylin/job/execution/NExecutableManagerTest.java +++ b/src/core-job/src/test/java/org/apache/kylin/job/execution/NExecutableManagerTest.java @@ -907,7 +907,7 @@ public class NExecutableManagerTest extends NLocalFileMetadataTestCase { val env = getTestConfig().getDeployEnv(); getTestConfig().setProperty("kylin.env", "PROD"); manager.cancelJob(NExecutableManager.toPO(job, DEFAULT_PROJECT), job.getId()); - Assertions.assertNull(scheduler.getContext().getRunningJobThread(job)); + Assertions.assertNotNull(scheduler.getContext().getRunningJobThread(job)); getTestConfig().setProperty("kylin.env", env); scheduler.shutdown(); } diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java index 5b02c09393..325ac9524f 100644 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java @@ -405,7 +405,8 @@ public class NSparkExecutable extends AbstractExecutable implements ChainedStage return desc; } - protected ExecuteResult runSparkSubmit(String hadoopConfDir, String kylinJobJar, String appArgs) { + protected ExecuteResult runSparkSubmit(String hadoopConfDir, String kylinJobJar, String appArgs) + throws JobStoppedException { sparkJobHandler.killOrphanApplicationIfExists(project, getId(), getConfig(), true, getSparkConf()); try { Object cmd; @@ -432,6 +433,7 @@ public class NSparkExecutable extends AbstractExecutable implements ChainedStage } return ExecuteResult.createSucceed(output); } catch (Exception e) { + checkNeedQuit(true); logger.warn("failed to execute spark submit command."); wrapWithExecuteExceptionUpdateJobError(e); return ExecuteResult.createError(e);