This is an automated email from the ASF dual-hosted git repository. nju_yaho pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit eddab3791b3322e8f8c42d5254e03c4f334897af Author: Zhong <nju_y...@apache.org> AuthorDate: Wed Jun 20 17:46:10 2018 +0800 KYLIN-3421 improve the fetcher runner in job scheduler --- .../job/impl/threadpool/DefaultFetcherRunner.java | 104 ++++++++++ .../job/impl/threadpool/DefaultScheduler.java | 210 ++------------------- .../job/impl/threadpool/DistributedScheduler.java | 116 ++++-------- .../kylin/job/impl/threadpool/FetcherRunner.java | 77 ++++++++ .../kylin/job/impl/threadpool/JobExecutor.java | 25 +++ .../job/impl/threadpool/PriorityFetcherRunner.java | 146 ++++++++++++++ .../job/impl/threadpool/BaseSchedulerTest.java | 2 +- 7 files changed, 401 insertions(+), 279 deletions(-) diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java new file mode 100644 index 0000000..e5f15fe --- /dev/null +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.job.impl.threadpool; + +import java.util.Map; + +import org.apache.kylin.common.util.SetThreadName; +import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.Executable; +import org.apache.kylin.job.execution.ExecutableManager; +import org.apache.kylin.job.execution.ExecutableState; +import org.apache.kylin.job.execution.Output; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DefaultFetcherRunner extends FetcherRunner { + + private static final Logger logger = LoggerFactory.getLogger(DefaultFetcherRunner.class); + + public DefaultFetcherRunner(JobEngineConfig jobEngineConfig, DefaultContext context, + ExecutableManager executableManager, JobExecutor jobExecutor) { + super(jobEngineConfig, context, executableManager, jobExecutor); + } + + @Override + synchronized public void run() { + try (SetThreadName ignored = new SetThreadName(// + "FetcherRunner %s", System.identityHashCode(this))) {// + // logger.debug("Job Fetcher is running..."); + Map<String, Executable> runningJobs = context.getRunningJobs(); + if (isJobPoolFull()) { + return; + } + + int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 0, nDiscarded = 0, nSUCCEED = 0; + for (final String id : executableManager.getAllJobIds()) { + if (isJobPoolFull()) { + return; + } + if (runningJobs.containsKey(id)) { + // logger.debug("Job id:" + id + " is already running"); + nRunning++; + continue; + } + + final Output output = executableManager.getOutput(id); + if ((output.getState() != ExecutableState.READY)) { + // logger.debug("Job id:" + id + " not runnable"); + if (output.getState() == ExecutableState.SUCCEED) { + nSUCCEED++; + } else if (output.getState() == ExecutableState.ERROR) { + nError++; + } else if (output.getState() == ExecutableState.DISCARDED) { + nDiscarded++; + } else if (output.getState() == ExecutableState.STOPPED) { + nStopped++; + } else { + if (fetchFailed) { + executableManager.forceKillJob(id); + nError++; + } else { + nOthers++; + } + } + continue; + } + + final AbstractExecutable executable = executableManager.getJob(id); + if (!executable.isReady()) { + nOthers++; + continue; + } + + nReady++; + addToJobPool(executable, executable.getDefaultPriority()); + } + + fetchFailed = false; + logger.info("Job Fetcher: " + nRunning + " should running, " + runningJobs.size() + " actual running, " + + nStopped + " stopped, " + nReady + " ready, " + nSUCCEED + " already succeed, " + nError + + " error, " + nDiscarded + " discarded, " + nOthers + " others"); + } catch (Throwable th) { + fetchFailed = true; // this could happen when resource store is unavailable + logger.warn("Job Fetcher caught a exception ", th); + } + } +} diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java index 920601d..c566408 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java @@ -18,9 +18,6 @@ package org.apache.kylin.job.impl.threadpool; -import java.util.Comparator; -import java.util.Map; -import java.util.PriorityQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -31,7 +28,6 @@ import java.util.concurrent.TimeUnit; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; -import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.SetThreadName; import org.apache.kylin.job.Scheduler; import org.apache.kylin.job.engine.JobEngineConfig; @@ -40,8 +36,6 @@ import org.apache.kylin.job.exception.SchedulerException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.Executable; import org.apache.kylin.job.execution.ExecutableManager; -import org.apache.kylin.job.execution.ExecutableState; -import org.apache.kylin.job.execution.Output; import org.apache.kylin.job.lock.JobLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,7 +78,7 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti private JobLock jobLock; private ExecutableManager executableManager; - private Runnable fetcher; + private FetcherRunner fetcher; private ScheduledExecutorService fetcherPool; private ExecutorService jobPool; private DefaultContext context; @@ -92,7 +86,6 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti private static final Logger logger = LoggerFactory.getLogger(DefaultScheduler.class); private volatile boolean initialized = false; private volatile boolean hasStarted = false; - volatile boolean fetchFailed = false; private JobEngineConfig jobEngineConfig; public DefaultScheduler() { @@ -101,195 +94,8 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti } } - private class FetcherRunnerWithPriority implements Runnable { - volatile PriorityQueue<Pair<AbstractExecutable, Integer>> jobPriorityQueue = new PriorityQueue<>(1, - new Comparator<Pair<AbstractExecutable, Integer>>() { - @Override - public int compare(Pair<AbstractExecutable, Integer> o1, Pair<AbstractExecutable, Integer> o2) { - return o1.getSecond() > o2.getSecond() ? -1 : 1; - } - }); - - private void addToJobPool(AbstractExecutable executable, int priority) { - String jobDesc = executable.toString(); - logger.info(jobDesc + " prepare to schedule and its priority is " + priority); - try { - context.addRunningJob(executable); - jobPool.execute(new JobRunner(executable)); - logger.info(jobDesc + " scheduled"); - } catch (Exception ex) { - context.removeRunningJob(executable); - logger.warn(jobDesc + " fail to schedule", ex); - } - } - - @Override - synchronized public void run() { - try (SetThreadName ignored = new SetThreadName(// - "Scheduler %s PriorityFetcherRunner %s"// - , System.identityHashCode(DefaultScheduler.this)// - , System.identityHashCode(this)// - )) {// - // logger.debug("Job Fetcher is running..."); - Map<String, Executable> runningJobs = context.getRunningJobs(); - - // fetch job from jobPriorityQueue first to reduce chance to scan job list - Map<String, Integer> leftJobPriorities = Maps.newHashMap(); - Pair<AbstractExecutable, Integer> executableWithPriority; - while ((executableWithPriority = jobPriorityQueue.peek()) != null - // the priority of jobs in pendingJobPriorities should be above a threshold - && executableWithPriority.getSecond() >= jobEngineConfig.getFetchQueuePriorityBar()) { - executableWithPriority = jobPriorityQueue.poll(); - AbstractExecutable executable = executableWithPriority.getFirst(); - int curPriority = executableWithPriority.getSecond(); - // the job should wait more than one time - if (curPriority > executable.getDefaultPriority() + 1) { - addToJobPool(executable, curPriority); - } else { - leftJobPriorities.put(executable.getId(), curPriority + 1); - } - } - - if (runningJobs.size() >= jobEngineConfig.getMaxConcurrentJobLimit()) { - logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time"); - return; - } - - while ((executableWithPriority = jobPriorityQueue.poll()) != null) { - leftJobPriorities.put(executableWithPriority.getFirst().getId(), - executableWithPriority.getSecond() + 1); - } - - int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 0, nDiscarded = 0, nSUCCEED = 0; - for (final String id : executableManager.getAllJobIds()) { - if (runningJobs.containsKey(id)) { - // logger.debug("Job id:" + id + " is already running"); - nRunning++; - continue; - } - - AbstractExecutable executable = executableManager.getJob(id); - if (!executable.isReady()) { - final Output output = executableManager.getOutput(id); - // logger.debug("Job id:" + id + " not runnable"); - if (output.getState() == ExecutableState.DISCARDED) { - nDiscarded++; - } else if (output.getState() == ExecutableState.ERROR) { - nError++; - } else if (output.getState() == ExecutableState.SUCCEED) { - nSUCCEED++; - } else if (output.getState() == ExecutableState.STOPPED) { - nStopped++; - } else { - nOthers++; - } - continue; - } - - nReady++; - Integer priority = leftJobPriorities.get(id); - if (priority == null) { - priority = executable.getDefaultPriority(); - } - jobPriorityQueue.add(new Pair<>(executable, priority)); - } - - while (runningJobs.size() < jobEngineConfig.getMaxConcurrentJobLimit() - && (executableWithPriority = jobPriorityQueue.poll()) != null) { - addToJobPool(executableWithPriority.getFirst(), executableWithPriority.getSecond()); - } - - logger.info("Priority Job Fetcher: " + nRunning + " running, " + runningJobs.size() + " actual running, " - + nStopped + " stopped, " + nReady + " ready, " + jobPriorityQueue.size() + " waiting, " // - + nSUCCEED + " already succeed, " + nError + " error, " + nDiscarded + " discarded, " + nOthers - + " others"); - } catch (Throwable th) { - logger.warn("Priority Job Fetcher caught a exception " + th); - } - } - } - - private class FetcherRunner implements Runnable { - - @Override - synchronized public void run() { - try (SetThreadName ignored = new SetThreadName(// - "Scheduler %s FetcherRunner %s"// - , System.identityHashCode(DefaultScheduler.this)// - , System.identityHashCode(this)// - )) {// - // logger.debug("Job Fetcher is running..."); - Map<String, Executable> runningJobs = context.getRunningJobs(); - if (isJobPoolFull()) { - return; - } - - int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 0, nDiscarded = 0, nSUCCEED = 0; - for (final String id : executableManager.getAllJobIds()) { - if (isJobPoolFull()) { - return; - } - if (runningJobs.containsKey(id)) { - // logger.debug("Job id:" + id + " is already running"); - nRunning++; - continue; - } - final AbstractExecutable executable = executableManager.getJob(id); - if (!executable.isReady()) { - final Output output = executableManager.getOutput(id); - // logger.debug("Job id:" + id + " not runnable"); - if (output.getState() == ExecutableState.DISCARDED) { - nDiscarded++; - } else if (output.getState() == ExecutableState.ERROR) { - nError++; - } else if (output.getState() == ExecutableState.SUCCEED) { - nSUCCEED++; - } else if (output.getState() == ExecutableState.STOPPED) { - nStopped++; - } else { - if (fetchFailed) { - executableManager.forceKillJob(id); - nError++; - } else { - nOthers++; - } - } - continue; - } - nReady++; - String jobDesc = null; - try { - jobDesc = executable.toString(); - logger.info(jobDesc + " prepare to schedule"); - context.addRunningJob(executable); - jobPool.execute(new JobRunner(executable)); - logger.info(jobDesc + " scheduled"); - } catch (Exception ex) { - if (executable != null) - context.removeRunningJob(executable); - logger.warn(jobDesc + " fail to schedule", ex); - } - } - - fetchFailed = false; - logger.info("Job Fetcher: " + nRunning + " should running, " + runningJobs.size() + " actual running, " - + nStopped + " stopped, " + nReady + " ready, " + nSUCCEED + " already succeed, " + nError - + " error, " + nDiscarded + " discarded, " + nOthers + " others"); - } catch (Throwable th) { - fetchFailed = true; // this could happen when resource store is unavailable - logger.warn("Job Fetcher caught a exception ", th); - } - } - } - - private boolean isJobPoolFull() { - Map<String, Executable> runningJobs = context.getRunningJobs(); - if (runningJobs.size() >= jobEngineConfig.getMaxConcurrentJobLimit()) { - logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time"); - return true; - } - - return false; + public FetcherRunner getFetcherRunner() { + return fetcher; } private class JobRunner implements Runnable { @@ -367,7 +173,15 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti int pollSecond = jobEngineConfig.getPollIntervalSecond(); logger.info("Fetching jobs every {} seconds", pollSecond); - fetcher = jobEngineConfig.getJobPriorityConsidered() ? new FetcherRunnerWithPriority() : new FetcherRunner(); + JobExecutor jobExecutor = new JobExecutor() { + @Override + public void execute(AbstractExecutable executable) { + jobPool.execute(new JobRunner(executable)); + } + }; + fetcher = jobEngineConfig.getJobPriorityConsidered() + ? new PriorityFetcherRunner(jobEngineConfig, context, executableManager, jobExecutor) + : new DefaultFetcherRunner(jobEngineConfig, context, executableManager, jobExecutor); logger.info("Creating fetcher pool instance:" + System.identityHashCode(fetcher)); fetcherPool.scheduleAtFixedRate(fetcher, pollSecond / 10, pollSecond, TimeUnit.SECONDS); hasStarted = true; diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java index 055de4d..cb4d815 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java @@ -20,7 +20,6 @@ package org.apache.kylin.job.impl.threadpool; import java.io.Closeable; import java.io.IOException; -import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutorService; @@ -63,7 +62,6 @@ import com.google.common.collect.Maps; public class DistributedScheduler implements Scheduler<AbstractExecutable>, ConnectionStateListener { private static final Logger logger = LoggerFactory.getLogger(DistributedScheduler.class); - private final static String SEGMENT_ID = "segmentId"; public static final String ZOOKEEPER_LOCK_PATH = "/job_engine/lock"; // note ZookeeperDistributedLock will ensure zk path prefix: /${kylin.env.zookeeper-base-path}/metadata public static DistributedScheduler getInstance(KylinConfig config) { @@ -86,57 +84,13 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn private DistributedLock jobLock; private Closeable lockWatch; - //keep all segments having running job - private final Set<String> segmentWithLocks = new CopyOnWriteArraySet<>(); + //keep all running job + private final Set<String> jobWithLocks = new CopyOnWriteArraySet<>(); private volatile boolean initialized = false; private volatile boolean hasStarted = false; private JobEngineConfig jobEngineConfig; private String serverName; - private class FetcherRunner implements Runnable { - @Override - synchronized public void run() { - try { - Map<String, Executable> runningJobs = context.getRunningJobs(); - if (runningJobs.size() >= jobEngineConfig.getMaxConcurrentJobLimit()) { - logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time"); - return; - } - - int nRunning = 0, nOtherRunning = 0, nReady = 0, nOthers = 0; - for (final String id : executableManager.getAllJobIds()) { - if (runningJobs.containsKey(id)) { - nRunning++; - continue; - } - - final Output output = executableManager.getOutput(id); - - if ((output.getState() != ExecutableState.READY)) { - if (output.getState() == ExecutableState.RUNNING) { - nOtherRunning++; - } else { - nOthers++; - } - continue; - } - - nReady++; - final AbstractExecutable executable = executableManager.getJob(id); - try { - jobPool.execute(new JobRunner(executable)); - } catch (Exception ex) { - logger.warn(executable.toString() + " fail to schedule in server: " + serverName, ex); - } - } - logger.info("Job Fetcher: " + nRunning + " should running, " + runningJobs.size() + " actual running, " - + nOtherRunning + " running in other server, " + nReady + " ready, " + nOthers + " others"); - } catch (Exception e) { - logger.warn("Job Fetcher caught a exception " + e); - } - } - } - private class JobRunner implements Runnable { private final AbstractExecutable executable; @@ -149,12 +103,11 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn public void run() { try (SetThreadName ignored = new SetThreadName("Scheduler %s Job %s", System.identityHashCode(DistributedScheduler.this), executable.getId())) { - String segmentId = executable.getParam(SEGMENT_ID); - if (jobLock.lock(getLockPath(segmentId))) { + if (jobLock.lock(getLockPath(executable.getId()))) { logger.info(executable.toString() + " scheduled in server: " + serverName); context.addRunningJob(executable); - segmentWithLocks.add(segmentId); + jobWithLocks.add(executable.getId()); executable.execute(context); } } catch (ExecuteException e) { @@ -172,21 +125,21 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn //release job lock when job state is ready or running and the job server keep the cube lock. private void releaseJobLock(AbstractExecutable executable) { if (executable instanceof DefaultChainedExecutable) { - String segmentId = executable.getParam(SEGMENT_ID); ExecutableState state = executable.getStatus(); if (state != ExecutableState.READY && state != ExecutableState.RUNNING) { - if (segmentWithLocks.contains(segmentId)) { - logger.info(executable.toString() + " will release the lock for the segment: " + segmentId); - jobLock.unlock(getLockPath(segmentId)); - segmentWithLocks.remove(segmentId); + if (jobWithLocks.contains(executable.getId())) { + logger.info( + executable.toString() + " will release the lock for the job: " + executable.getId()); + jobLock.unlock(getLockPath(executable.getId())); + jobWithLocks.remove(executable.getId()); } } } } } - //when the segment lock released but the segment related job still running, resume the job. + //when the job lock released but the related job still running, resume the job. private class WatcherProcessImpl implements DistributedLock.Watcher { private String serverName; @@ -197,26 +150,21 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn @Override public void onUnlock(String path, String nodeData) { String[] paths = path.split("/"); - String segmentId = paths[paths.length - 1]; - - for (final String id : executableManager.getAllJobIds()) { - final Output output = executableManager.getOutput(id); - if (output.getState() == ExecutableState.RUNNING) { - AbstractExecutable executable = executableManager.getJob(id); - if (executable instanceof DefaultChainedExecutable - && executable.getParams().get(SEGMENT_ID).equalsIgnoreCase(segmentId) - && !nodeData.equalsIgnoreCase(serverName)) { - try { - logger.warn(nodeData + " has released the lock for: " + segmentId - + " but the job still running. so " + serverName + " resume the job"); - if (!jobLock.isLocked(getLockPath(segmentId))) { - executableManager.resumeRunningJobForce(executable.getId()); - fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS); - break; - } - } catch (Exception e) { - logger.error("resume the job but fail in server: " + serverName, e); + String jobId = paths[paths.length - 1]; + + final Output output = executableManager.getOutput(jobId); + if (output.getState() == ExecutableState.RUNNING) { + AbstractExecutable executable = executableManager.getJob(jobId); + if (executable instanceof DefaultChainedExecutable && !nodeData.equalsIgnoreCase(serverName)) { + try { + logger.warn(nodeData + " has released the lock for: " + jobId + + " but the job still running. so " + serverName + " resume the job"); + if (!jobLock.isLocked(getLockPath(jobId))) { + executableManager.resumeRunningJobForce(executable.getId()); + fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS); } + } catch (Exception e) { + logger.error("resume the job but fail in server: " + serverName, e); } } } @@ -273,7 +221,15 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn int pollSecond = jobEngineConfig.getPollIntervalSecond(); logger.info("Fetching jobs every {} seconds", pollSecond); - fetcher = new FetcherRunner(); + JobExecutor jobExecutor = new JobExecutor() { + @Override + public void execute(AbstractExecutable executable) { + jobPool.execute(new JobRunner(executable)); + } + }; + fetcher = jobEngineConfig.getJobPriorityConsidered() + ? new PriorityFetcherRunner(jobEngineConfig, context, executableManager, jobExecutor) + : new DefaultFetcherRunner(jobEngineConfig, context, executableManager, jobExecutor); fetcherPool.scheduleAtFixedRate(fetcher, pollSecond / 10, pollSecond, TimeUnit.SECONDS); hasStarted = true; @@ -286,7 +242,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn AbstractExecutable executable = executableManager.getJob(id); if (output.getState() == ExecutableState.RUNNING && executable instanceof DefaultChainedExecutable) { try { - if (!jobLock.isLocked(getLockPath(executable.getParam(SEGMENT_ID)))) { + if (!jobLock.isLocked(getLockPath(executable.getId()))) { executableManager.resumeRunningJobForce(executable.getId()); fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS); } @@ -334,8 +290,8 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn } private void releaseAllLocks() { - for (String segmentId : segmentWithLocks) { - jobLock.unlock(getLockPath(segmentId)); + for (String jobId : jobWithLocks) { + jobLock.unlock(getLockPath(jobId)); } } diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java new file mode 100644 index 0000000..d98ca33 --- /dev/null +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job.impl.threadpool; + +import java.util.Map; + +import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.Executable; +import org.apache.kylin.job.execution.ExecutableManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +public abstract class FetcherRunner implements Runnable { + + private static final Logger logger = LoggerFactory.getLogger(FetcherRunner.class); + + protected JobEngineConfig jobEngineConfig; + protected DefaultContext context; + protected ExecutableManager executableManager; + protected JobExecutor jobExecutor; + protected volatile boolean fetchFailed = false; + + public FetcherRunner(JobEngineConfig jobEngineConfig, DefaultContext context, ExecutableManager executableManager, + JobExecutor jobExecutor) { + this.jobEngineConfig = jobEngineConfig; + this.context = context; + this.executableManager = executableManager; + this.jobExecutor = jobExecutor; + } + + protected boolean isJobPoolFull() { + Map<String, Executable> runningJobs = context.getRunningJobs(); + if (runningJobs.size() >= jobEngineConfig.getMaxConcurrentJobLimit()) { + logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time"); + return true; + } + + return false; + } + + protected void addToJobPool(AbstractExecutable executable, int priority) { + String jobDesc = executable.toString(); + logger.info(jobDesc + " prepare to schedule and its priority is " + priority); + try { + context.addRunningJob(executable); + jobExecutor.execute(executable); + logger.info(jobDesc + " scheduled"); + } catch (Exception ex) { + context.removeRunningJob(executable); + logger.warn(jobDesc + " fail to schedule", ex); + } + } + + @VisibleForTesting + void setFetchFailed(boolean fetchFailed) { + this.fetchFailed = fetchFailed; + } +} diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/JobExecutor.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/JobExecutor.java new file mode 100644 index 0000000..d2efd22 --- /dev/null +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/JobExecutor.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.job.impl.threadpool; + +import org.apache.kylin.job.execution.AbstractExecutable; + +public interface JobExecutor { + void execute(AbstractExecutable executable); +} diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java new file mode 100644 index 0000000..b562fac --- /dev/null +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job.impl.threadpool; + +import java.util.Comparator; +import java.util.Map; +import java.util.PriorityQueue; + +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.common.util.SetThreadName; +import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.Executable; +import org.apache.kylin.job.execution.ExecutableManager; +import org.apache.kylin.job.execution.ExecutableState; +import org.apache.kylin.job.execution.Output; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Maps; + +public class PriorityFetcherRunner extends FetcherRunner { + + private static final Logger logger = LoggerFactory.getLogger(PriorityFetcherRunner.class); + + private volatile PriorityQueue<Pair<AbstractExecutable, Integer>> jobPriorityQueue = new PriorityQueue<>(1, + new Comparator<Pair<AbstractExecutable, Integer>>() { + @Override + public int compare(Pair<AbstractExecutable, Integer> o1, Pair<AbstractExecutable, Integer> o2) { + return o1.getSecond() > o2.getSecond() ? -1 : 1; + } + }); + + public PriorityFetcherRunner(JobEngineConfig jobEngineConfig, DefaultContext context, + ExecutableManager executableManager, JobExecutor jobExecutor) { + super(jobEngineConfig, context, executableManager, jobExecutor); + } + + @Override + synchronized public void run() { + try (SetThreadName ignored = new SetThreadName(// + "PriorityFetcherRunner %s", System.identityHashCode(this))) {// + // logger.debug("Job Fetcher is running..."); + + // fetch job from jobPriorityQueue first to reduce chance to scan job list + Map<String, Integer> leftJobPriorities = Maps.newHashMap(); + Pair<AbstractExecutable, Integer> executableWithPriority; + while ((executableWithPriority = jobPriorityQueue.peek()) != null + // the priority of jobs in pendingJobPriorities should be above a threshold + && executableWithPriority.getSecond() >= jobEngineConfig.getFetchQueuePriorityBar()) { + executableWithPriority = jobPriorityQueue.poll(); + AbstractExecutable executable = executableWithPriority.getFirst(); + int curPriority = executableWithPriority.getSecond(); + // the job should wait more than one time + if (curPriority > executable.getDefaultPriority() + 1) { + addToJobPool(executable, curPriority); + } else { + leftJobPriorities.put(executable.getId(), curPriority + 1); + } + } + + Map<String, Executable> runningJobs = context.getRunningJobs(); + if (isJobPoolFull()) { + return; + } + + while ((executableWithPriority = jobPriorityQueue.poll()) != null) { + leftJobPriorities.put(executableWithPriority.getFirst().getId(), + executableWithPriority.getSecond() + 1); + } + + int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 0, nDiscarded = 0, nSUCCEED = 0; + for (final String id : executableManager.getAllJobIds()) { + if (runningJobs.containsKey(id)) { + // logger.debug("Job id:" + id + " is already running"); + nRunning++; + continue; + } + + final Output output = executableManager.getOutput(id); + if ((output.getState() != ExecutableState.READY)) { + // logger.debug("Job id:" + id + " not runnable"); + if (output.getState() == ExecutableState.SUCCEED) { + nSUCCEED++; + } else if (output.getState() == ExecutableState.ERROR) { + nError++; + } else if (output.getState() == ExecutableState.DISCARDED) { + nDiscarded++; + } else if (output.getState() == ExecutableState.STOPPED) { + nStopped++; + } else { + if (fetchFailed) { + executableManager.forceKillJob(id); + nError++; + } else { + nOthers++; + } + } + continue; + } + + AbstractExecutable executable = executableManager.getJob(id); + if (!executable.isReady()) { + nOthers++; + continue; + } + + nReady++; + Integer priority = leftJobPriorities.get(id); + if (priority == null) { + priority = executable.getDefaultPriority(); + } + jobPriorityQueue.add(new Pair<>(executable, priority)); + } + + while ((executableWithPriority = jobPriorityQueue.poll()) != null && !isJobPoolFull()) { + addToJobPool(executableWithPriority.getFirst(), executableWithPriority.getSecond()); + } + + fetchFailed = false; + logger.info("Priority Job Fetcher: " + nRunning + " running, " + runningJobs.size() + " actual running, " + + nStopped + " stopped, " + nReady + " ready, " + jobPriorityQueue.size() + " waiting, " // + + nSUCCEED + " already succeed, " + nError + " error, " + nDiscarded + " discarded, " + nOthers + + " others"); + } catch (Throwable th) { + fetchFailed = true; // this could happen when resource store is unavailable + logger.warn("Priority Job Fetcher caught a exception " + th); + } + } +} diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java index d7201f2..7c66f2c 100644 --- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java +++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java @@ -129,7 +129,7 @@ public abstract class BaseSchedulerTest extends LocalFileMetadataTestCase { AbstractExecutable job = execMgr.getJob(jobId); ExecutableState status = job.getStatus(); if (status == ExecutableState.RUNNING) { - scheduler.fetchFailed = true; + scheduler.getFetcherRunner().setFetchFailed(true); break; } Thread.sleep(1000);