This is an automated email from the ASF dual-hosted git repository. liyang pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 6816947de43cc1f3a7101c06a1d32d623a36ef06 Author: sibingzhang <74443791+sibingzh...@users.noreply.github.com> AuthorDate: Wed Jun 28 18:28:54 2023 +0800 KYLIN-5762 Initialize job scheduler encounters NPE Co-authored-by: sibing.zhang <sibing.zh...@qq.com> --- .../java/org/apache/kylin/job/execution/ExecutableContext.java | 9 ++++++--- .../java/org/apache/kylin/job/execution/ExecutableThread.java | 1 + .../src/main/java/org/apache/kylin/job/runners/JobRunner.java | 3 ++- .../org/apache/kylin/job/execution/ExecutableContextTest.java | 4 +++- .../org/apache/kylin/job/execution/NExecutableManagerTest.java | 2 +- .../java/org/apache/kylin/rest/service/DagJobServiceTest.java | 8 ++++---- 6 files changed, 17 insertions(+), 10 deletions(-) diff --git a/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java b/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java index 16c9e9f576..fea0a74f09 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java @@ -23,10 +23,10 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; -import org.apache.kylin.guava30.shaded.common.collect.Sets; import org.apache.kylin.common.KylinConfig; - import org.apache.kylin.guava30.shaded.common.collect.Maps; +import org.apache.kylin.guava30.shaded.common.collect.Sets; + import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; @@ -68,7 +68,6 @@ public class ExecutableContext { // Only used when the job starts scheduling public void addRunningJob(Executable executable) { - runningJobThreads.put(executable.getId(), Thread.currentThread()); runningJobs.put(executable.getId(), executable); runningJobInfos.put(executable.getId(), System.currentTimeMillis()); } @@ -84,6 +83,10 @@ public class ExecutableContext { return runningJobThreads.get(executable.getId()); } + public void addRunningJobThread(Executable executable) { + runningJobThreads.put(executable.getId(), Thread.currentThread()); + } + public Map<String, Executable> getRunningJobs() { return Collections.unmodifiableMap(runningJobs); } diff --git a/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableThread.java b/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableThread.java index 61140daeec..162eed1daa 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableThread.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableThread.java @@ -50,6 +50,7 @@ public class ExecutableThread extends Thread { try (SetThreadName ignored = new SetThreadName("JobWorker(project:%s,jobid:%s)", project, jobIdSimple); SetLogCategory logCategory = new SetLogCategory("schedule")) { context.addRunningJob(executable); + context.addRunningJobThread(executable); dagExecutable.executeDagExecutable(dagExecutablesMap, executable, context); } finally { context.removeRunningJob(executable); diff --git a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java index 3e839f57f4..b8cd19bed0 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java @@ -18,13 +18,13 @@ package org.apache.kylin.job.runners; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.logging.SetLogCategory; import org.apache.kylin.common.util.SetThreadName; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.exception.JobStoppedNonVoluntarilyException; import org.apache.kylin.job.exception.JobStoppedVoluntarilyException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.impl.threadpool.NDefaultScheduler; -import org.apache.kylin.common.logging.SetLogCategory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,6 +50,7 @@ public class JobRunner extends AbstractDefaultSchedulerRunner { val jobIdSimple = executable.getId().substring(0, 8); try (SetThreadName ignored = new SetThreadName("JobWorker(project:%s,jobid:%s)", project, jobIdSimple); SetLogCategory logCategory = new SetLogCategory("schedule")) { + context.addRunningJobThread(executable); executable.execute(context); // trigger the next step asap fetcherRunner.scheduleNext(); diff --git a/src/core-job/src/test/java/org/apache/kylin/job/execution/ExecutableContextTest.java b/src/core-job/src/test/java/org/apache/kylin/job/execution/ExecutableContextTest.java index db4e2ff2b0..b294d0458f 100644 --- a/src/core-job/src/test/java/org/apache/kylin/job/execution/ExecutableContextTest.java +++ b/src/core-job/src/test/java/org/apache/kylin/job/execution/ExecutableContextTest.java @@ -24,10 +24,10 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.guava30.shaded.common.collect.Maps; import org.apache.kylin.junit.annotation.MetadataInfo; import org.junit.jupiter.api.Test; -import org.apache.kylin.guava30.shaded.common.collect.Maps; import lombok.val; @MetadataInfo @@ -39,7 +39,9 @@ class ExecutableContextTest { val context = new ExecutableContext(Maps.newConcurrentMap(), Maps.newConcurrentMap(), KylinConfig.getInstanceFromEnv(), 0); context.addRunningJob(job); + assertNull(context.getRunningJobThread(job)); + context.addRunningJobThread(job); assertNotNull(context.getRunningJobThread(job)); context.removeRunningJob(job); diff --git a/src/core-job/src/test/java/org/apache/kylin/job/execution/NExecutableManagerTest.java b/src/core-job/src/test/java/org/apache/kylin/job/execution/NExecutableManagerTest.java index d7a891c74d..e56c8f260e 100644 --- a/src/core-job/src/test/java/org/apache/kylin/job/execution/NExecutableManagerTest.java +++ b/src/core-job/src/test/java/org/apache/kylin/job/execution/NExecutableManagerTest.java @@ -861,7 +861,7 @@ public class NExecutableManagerTest extends NLocalFileMetadataTestCase { new Thread(() -> { try { - scheduler.getContext().addRunningJob(job); + scheduler.getContext().addRunningJobThread(job); job.doWork(scheduler.getContext()); } catch (ExecuteException ignored) { } finally { diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/DagJobServiceTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/DagJobServiceTest.java index 30e9b0380a..3417887425 100644 --- a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/DagJobServiceTest.java +++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/DagJobServiceTest.java @@ -27,6 +27,8 @@ import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.persistence.transaction.UnitOfWork; import org.apache.kylin.common.util.RandomUtil; import org.apache.kylin.engine.spark.job.NSparkExecutable; +import org.apache.kylin.guava30.shaded.common.collect.Lists; +import org.apache.kylin.guava30.shaded.common.collect.Sets; import org.apache.kylin.job.constant.JobActionEnum; import org.apache.kylin.job.constant.JobStatusEnum; import org.apache.kylin.job.engine.JobEngineConfig; @@ -54,8 +56,6 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.springframework.test.util.ReflectionTestUtils; -import org.apache.kylin.guava30.shaded.common.collect.Lists; -import org.apache.kylin.guava30.shaded.common.collect.Sets; import lombok.val; @MetadataInfo @@ -275,7 +275,7 @@ class DagJobServiceTest { } @Test - void updateStepStatus() throws Exception { + void updateStepStatus() { val config = KylinConfig.getInstanceFromEnv(); val sparkMaster = config.getSparkMaster(); val scheduler = NDefaultScheduler.getInstance(DEFAULT_PROJECT); @@ -290,7 +290,7 @@ class DagJobServiceTest { executable.killApplicationIfExistsOrUpdateStepStatus(); Assertions.assertNull(context.getRunningJobThread(executable)); - context.addRunningJob(executable); + context.addRunningJobThread(executable); Assertions.assertNotNull(context.getRunningJobThread(executable)); executable.killApplicationIfExistsOrUpdateStepStatus(); Assertions.assertNull(context.getRunningJobThread(executable));