This is an automated email from the ASF dual-hosted git repository.

liyang pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 6816947de43cc1f3a7101c06a1d32d623a36ef06
Author: sibingzhang <74443791+sibingzh...@users.noreply.github.com>
AuthorDate: Wed Jun 28 18:28:54 2023 +0800

    KYLIN-5762 Initialize job scheduler encounters NPE
    
    Co-authored-by: sibing.zhang <sibing.zh...@qq.com>
---
 .../java/org/apache/kylin/job/execution/ExecutableContext.java   | 9 ++++++---
 .../java/org/apache/kylin/job/execution/ExecutableThread.java    | 1 +
 .../src/main/java/org/apache/kylin/job/runners/JobRunner.java    | 3 ++-
 .../org/apache/kylin/job/execution/ExecutableContextTest.java    | 4 +++-
 .../org/apache/kylin/job/execution/NExecutableManagerTest.java   | 2 +-
 .../java/org/apache/kylin/rest/service/DagJobServiceTest.java    | 8 ++++----
 6 files changed, 17 insertions(+), 10 deletions(-)

diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java
 
b/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java
index 16c9e9f576..fea0a74f09 100644
--- 
a/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java
+++ 
b/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java
@@ -23,10 +23,10 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 
-import org.apache.kylin.guava30.shaded.common.collect.Sets;
 import org.apache.kylin.common.KylinConfig;
-
 import org.apache.kylin.guava30.shaded.common.collect.Maps;
+import org.apache.kylin.guava30.shaded.common.collect.Sets;
+
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
@@ -68,7 +68,6 @@ public class ExecutableContext {
 
     // Only used when the job starts scheduling
     public void addRunningJob(Executable executable) {
-        runningJobThreads.put(executable.getId(), Thread.currentThread());
         runningJobs.put(executable.getId(), executable);
         runningJobInfos.put(executable.getId(), System.currentTimeMillis());
     }
@@ -84,6 +83,10 @@ public class ExecutableContext {
         return runningJobThreads.get(executable.getId());
     }
 
+    public void addRunningJobThread(Executable executable) {
+        runningJobThreads.put(executable.getId(), Thread.currentThread());
+    }
+
     public Map<String, Executable> getRunningJobs() {
         return Collections.unmodifiableMap(runningJobs);
     }
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableThread.java
 
b/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableThread.java
index 61140daeec..162eed1daa 100644
--- 
a/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableThread.java
+++ 
b/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableThread.java
@@ -50,6 +50,7 @@ public class ExecutableThread extends Thread {
         try (SetThreadName ignored = new 
SetThreadName("JobWorker(project:%s,jobid:%s)", project, jobIdSimple);
                 SetLogCategory logCategory = new SetLogCategory("schedule")) {
             context.addRunningJob(executable);
+            context.addRunningJobThread(executable);
             dagExecutable.executeDagExecutable(dagExecutablesMap, executable, 
context);
         } finally {
             context.removeRunningJob(executable);
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java 
b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java
index 3e839f57f4..b8cd19bed0 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java
@@ -18,13 +18,13 @@
 package org.apache.kylin.job.runners;
 
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.logging.SetLogCategory;
 import org.apache.kylin.common.util.SetThreadName;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.exception.JobStoppedNonVoluntarilyException;
 import org.apache.kylin.job.exception.JobStoppedVoluntarilyException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
-import org.apache.kylin.common.logging.SetLogCategory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,6 +50,7 @@ public class JobRunner extends AbstractDefaultSchedulerRunner 
{
         val jobIdSimple = executable.getId().substring(0, 8);
         try (SetThreadName ignored = new 
SetThreadName("JobWorker(project:%s,jobid:%s)", project, jobIdSimple);
                 SetLogCategory logCategory = new SetLogCategory("schedule")) {
+            context.addRunningJobThread(executable);
             executable.execute(context);
             // trigger the next step asap
             fetcherRunner.scheduleNext();
diff --git 
a/src/core-job/src/test/java/org/apache/kylin/job/execution/ExecutableContextTest.java
 
b/src/core-job/src/test/java/org/apache/kylin/job/execution/ExecutableContextTest.java
index db4e2ff2b0..b294d0458f 100644
--- 
a/src/core-job/src/test/java/org/apache/kylin/job/execution/ExecutableContextTest.java
+++ 
b/src/core-job/src/test/java/org/apache/kylin/job/execution/ExecutableContextTest.java
@@ -24,10 +24,10 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.guava30.shaded.common.collect.Maps;
 import org.apache.kylin.junit.annotation.MetadataInfo;
 import org.junit.jupiter.api.Test;
 
-import org.apache.kylin.guava30.shaded.common.collect.Maps;
 import lombok.val;
 
 @MetadataInfo
@@ -39,7 +39,9 @@ class ExecutableContextTest {
         val context = new ExecutableContext(Maps.newConcurrentMap(), 
Maps.newConcurrentMap(),
                 KylinConfig.getInstanceFromEnv(), 0);
         context.addRunningJob(job);
+        assertNull(context.getRunningJobThread(job));
 
+        context.addRunningJobThread(job);
         assertNotNull(context.getRunningJobThread(job));
 
         context.removeRunningJob(job);
diff --git 
a/src/core-job/src/test/java/org/apache/kylin/job/execution/NExecutableManagerTest.java
 
b/src/core-job/src/test/java/org/apache/kylin/job/execution/NExecutableManagerTest.java
index d7a891c74d..e56c8f260e 100644
--- 
a/src/core-job/src/test/java/org/apache/kylin/job/execution/NExecutableManagerTest.java
+++ 
b/src/core-job/src/test/java/org/apache/kylin/job/execution/NExecutableManagerTest.java
@@ -861,7 +861,7 @@ public class NExecutableManagerTest extends 
NLocalFileMetadataTestCase {
 
         new Thread(() -> {
             try {
-                scheduler.getContext().addRunningJob(job);
+                scheduler.getContext().addRunningJobThread(job);
                 job.doWork(scheduler.getContext());
             } catch (ExecuteException ignored) {
             } finally {
diff --git 
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/DagJobServiceTest.java
 
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/DagJobServiceTest.java
index 30e9b0380a..3417887425 100644
--- 
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/DagJobServiceTest.java
+++ 
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/DagJobServiceTest.java
@@ -27,6 +27,8 @@ import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.persistence.transaction.UnitOfWork;
 import org.apache.kylin.common.util.RandomUtil;
 import org.apache.kylin.engine.spark.job.NSparkExecutable;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
+import org.apache.kylin.guava30.shaded.common.collect.Sets;
 import org.apache.kylin.job.constant.JobActionEnum;
 import org.apache.kylin.job.constant.JobStatusEnum;
 import org.apache.kylin.job.engine.JobEngineConfig;
@@ -54,8 +56,6 @@ import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.springframework.test.util.ReflectionTestUtils;
 
-import org.apache.kylin.guava30.shaded.common.collect.Lists;
-import org.apache.kylin.guava30.shaded.common.collect.Sets;
 import lombok.val;
 
 @MetadataInfo
@@ -275,7 +275,7 @@ class DagJobServiceTest {
     }
 
     @Test
-    void updateStepStatus() throws Exception {
+    void updateStepStatus() {
         val config = KylinConfig.getInstanceFromEnv();
         val sparkMaster = config.getSparkMaster();
         val scheduler = NDefaultScheduler.getInstance(DEFAULT_PROJECT);
@@ -290,7 +290,7 @@ class DagJobServiceTest {
                 executable.killApplicationIfExistsOrUpdateStepStatus();
                 Assertions.assertNull(context.getRunningJobThread(executable));
 
-                context.addRunningJob(executable);
+                context.addRunningJobThread(executable);
                 
Assertions.assertNotNull(context.getRunningJobThread(executable));
                 executable.killApplicationIfExistsOrUpdateStepStatus();
                 Assertions.assertNull(context.getRunningJobThread(executable));

Reply via email to