This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit aaa1debe72f3f9ab915b54b4bb4c5d399842852d Author: sibingzhang <74443791+sibingzh...@users.noreply.github.com> AuthorDate: Tue Nov 1 15:51:43 2022 +0800 KYLIN-5369 Fix job is running, the job fails after the KE restart and failure information is displayed for several times --- .../org/apache/kylin/rest/config/initialize/BootstrapCommand.java | 6 ++++-- .../apache/kylin/rest/config/initialize/EpochChangedListener.java | 5 +++-- .../org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java | 8 ++++++++ .../src/main/java/org/apache/kylin/job/runners/FetcherRunner.java | 8 ++++++++ 4 files changed, 23 insertions(+), 4 deletions(-) diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/BootstrapCommand.java b/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/BootstrapCommand.java index 955b59c3cb..f5a682bb8f 100644 --- a/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/BootstrapCommand.java +++ b/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/BootstrapCommand.java @@ -29,6 +29,8 @@ import org.springframework.stereotype.Component; import lombok.val; import lombok.extern.slf4j.Slf4j; +import java.util.concurrent.atomic.AtomicBoolean; + @Slf4j @Component public class BootstrapCommand implements Runnable { @@ -50,8 +52,8 @@ public class BootstrapCommand implements Runnable { } void initProject(KylinConfig config, final ProjectInstance project) { + NDefaultScheduler scheduler = NDefaultScheduler.getInstance(project.getName()); EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> { - NDefaultScheduler scheduler = NDefaultScheduler.getInstance(project.getName()); scheduler.init(new JobEngineConfig(config)); if (!scheduler.hasStarted()) { throw new RuntimeException("Scheduler for " + project.getName() + " has not been started"); @@ -59,7 +61,7 @@ public class BootstrapCommand implements Runnable { return 0; }, project.getName(), 1, UnitOfWork.DEFAULT_EPOCH_ID); - + scheduler.setHasFinishedTransactions(new AtomicBoolean(true)); log.info("init project {} finished", project.getName()); } } diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/EpochChangedListener.java b/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/EpochChangedListener.java index e86435d6d3..6165322f40 100644 --- a/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/EpochChangedListener.java +++ b/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/EpochChangedListener.java @@ -18,6 +18,7 @@ package org.apache.kylin.rest.config.initialize; import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; @@ -93,8 +94,8 @@ public class EpochChangedListener { } log.info("start thread of project: {}", project); + NDefaultScheduler scheduler = NDefaultScheduler.getInstance(project); EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> { - NDefaultScheduler scheduler = NDefaultScheduler.getInstance(project); scheduler.init(new JobEngineConfig(kylinConfig)); if (!scheduler.hasStarted()) { throw new RuntimeException("Scheduler for " + project + " has not been started"); @@ -104,7 +105,6 @@ public class EpochChangedListener { if (!ss.getHasStarted().get()) { throw new RuntimeException("Streaming Scheduler for " + project + " has not been started"); } - QueryHistoryTaskScheduler qhAccelerateScheduler = QueryHistoryTaskScheduler.getInstance(project); qhAccelerateScheduler.init(); @@ -115,6 +115,7 @@ public class EpochChangedListener { recommendationUpdateScheduler.addProject(project); return 0; }, project, 1); + scheduler.setHasFinishedTransactions(new AtomicBoolean(true)); } else { //TODO need global leader CreateAdminUserUtils.createAllAdmins(userService, env); diff --git a/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java b/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java index 8297b92239..d3efe67bb9 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java @@ -29,6 +29,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import lombok.Setter; import org.apache.commons.lang3.RandomUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.ExecutorServiceUtil; @@ -69,6 +70,9 @@ public class NDefaultScheduler implements Scheduler<AbstractExecutable> { private ExecutableContext context; private AtomicBoolean initialized = new AtomicBoolean(false); private AtomicBoolean hasStarted = new AtomicBoolean(false); + + @Setter + private AtomicBoolean hasFinishedTransactions = new AtomicBoolean(false); @Getter private JobEngineConfig jobEngineConfig; @Getter @@ -226,6 +230,10 @@ public class NDefaultScheduler implements Scheduler<AbstractExecutable> { return hasStarted.get(); } + public boolean hasFinishedTransactions() { + return hasFinishedTransactions.get(); + } + public static double currentAvailableMem() { return 1.0 * memoryRemaining.availablePermits(); } diff --git a/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java b/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java index 93f9d89bdc..ab659c48a8 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java @@ -45,6 +45,8 @@ public class FetcherRunner extends AbstractDefaultSchedulerRunner { private final ScheduledExecutorService fetcherPool; + private boolean reSchedule = true; + public FetcherRunner(NDefaultScheduler nDefaultScheduler, ExecutorService jobPool, ScheduledExecutorService fetcherPool) { super(nDefaultScheduler); @@ -95,6 +97,12 @@ public class FetcherRunner extends AbstractDefaultSchedulerRunner { @Override public void doRun() { try { + // Job schedule is only limited to the transaction in the NDefaultScheduler once + // Avoid that if the first transaction fails, jobs in the project cannot be scheduled + if (!nDefaultScheduler.hasFinishedTransactions() && reSchedule) { + reSchedule = false; + return; + } val executableManager = NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project); Map<String, Executable> runningJobs = context.getRunningJobs();