This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit eb1dfe51f171819b56bb62ef4626b2dd51b0d4c2 Author: sibingzhang <74443791+sibingzh...@users.noreply.github.com> AuthorDate: Wed Mar 15 14:33:00 2023 +0800 KYLIN-5562 Fix the issue Job may be scheduled repeatedly Co-authored-by: sibing.zhang <sibing.zh...@qq.com> --- .../src/main/java/org/apache/kylin/job/execution/ExecutableContext.java | 2 ++ .../src/main/java/org/apache/kylin/job/runners/FetcherRunner.java | 1 + src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java | 1 - 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java b/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java index 0a7e76f518..3f2a51c334 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java @@ -66,12 +66,14 @@ public class ExecutableContext { return kylinConfig; } + // Only used when the job starts scheduling public void addRunningJob(Executable executable) { runningJobThreads.put(executable.getId(), Thread.currentThread()); runningJobs.put(executable.getId(), executable); runningJobInfos.put(executable.getId(), System.currentTimeMillis()); } + // Only used when the job is completed public void removeRunningJob(Executable executable) { runningJobThreads.remove(executable.getId()); runningJobs.remove(executable.getId()); diff --git a/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java b/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java index e9e334ecbe..0306c382ae 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java @@ -219,6 +219,7 @@ public class FetcherRunner extends AbstractDefaultSchedulerRunner { if (memoryLock) { jobDesc = executable.toString(); logger.info("{} prepare to schedule", jobDesc); + context.addRunningJob(executable); jobPool.execute(new JobRunner(nDefaultScheduler, executable, this)); logger.info("{} scheduled", jobDesc); } else { diff --git a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java index 74f4fbf365..3e839f57f4 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java @@ -50,7 +50,6 @@ public class JobRunner extends AbstractDefaultSchedulerRunner { val jobIdSimple = executable.getId().substring(0, 8); try (SetThreadName ignored = new SetThreadName("JobWorker(project:%s,jobid:%s)", project, jobIdSimple); SetLogCategory logCategory = new SetLogCategory("schedule")) { - context.addRunningJob(executable); executable.execute(context); // trigger the next step asap fetcherRunner.scheduleNext();