This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit eb1dfe51f171819b56bb62ef4626b2dd51b0d4c2
Author: sibingzhang <74443791+sibingzh...@users.noreply.github.com>
AuthorDate: Wed Mar 15 14:33:00 2023 +0800

    KYLIN-5562 Fix the issue Job may be scheduled repeatedly
    
    Co-authored-by: sibing.zhang <sibing.zh...@qq.com>
---
 .../src/main/java/org/apache/kylin/job/execution/ExecutableContext.java | 2 ++
 .../src/main/java/org/apache/kylin/job/runners/FetcherRunner.java       | 1 +
 src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java  | 1 -
 3 files changed, 3 insertions(+), 1 deletion(-)

diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java
 
b/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java
index 0a7e76f518..3f2a51c334 100644
--- 
a/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java
+++ 
b/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java
@@ -66,12 +66,14 @@ public class ExecutableContext {
         return kylinConfig;
     }
 
+    // Only used when the job starts scheduling
     public void addRunningJob(Executable executable) {
         runningJobThreads.put(executable.getId(), Thread.currentThread());
         runningJobs.put(executable.getId(), executable);
         runningJobInfos.put(executable.getId(), System.currentTimeMillis());
     }
 
+    // Only used when the job is completed
     public void removeRunningJob(Executable executable) {
         runningJobThreads.remove(executable.getId());
         runningJobs.remove(executable.getId());
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java 
b/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java
index e9e334ecbe..0306c382ae 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java
@@ -219,6 +219,7 @@ public class FetcherRunner extends 
AbstractDefaultSchedulerRunner {
             if (memoryLock) {
                 jobDesc = executable.toString();
                 logger.info("{} prepare to schedule", jobDesc);
+                context.addRunningJob(executable);
                 jobPool.execute(new JobRunner(nDefaultScheduler, executable, 
this));
                 logger.info("{} scheduled", jobDesc);
             } else {
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java 
b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java
index 74f4fbf365..3e839f57f4 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java
@@ -50,7 +50,6 @@ public class JobRunner extends AbstractDefaultSchedulerRunner 
{
         val jobIdSimple = executable.getId().substring(0, 8);
         try (SetThreadName ignored = new 
SetThreadName("JobWorker(project:%s,jobid:%s)", project, jobIdSimple);
                 SetLogCategory logCategory = new SetLogCategory("schedule")) {
-            context.addRunningJob(executable);
             executable.execute(context);
             // trigger the next step asap
             fetcherRunner.scheduleNext();

Reply via email to