This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new 67151e2 KYLIN-4249 Fix CI test 67151e2 is described below commit 67151e23f429d46c58215f90ffdee3ceb0b0b68e Author: nichunen <n...@apache.org> AuthorDate: Thu Dec 26 11:34:32 2019 +0800 KYLIN-4249 Fix CI test --- .../job/impl/threadpool/DistributedScheduler.java | 245 +++++++++++---------- 1 file changed, 124 insertions(+), 121 deletions(-) diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java index 4df9221..944e0b8 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java @@ -30,6 +30,7 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.lock.DistributedLock; import org.apache.kylin.common.util.SetThreadName; @@ -60,22 +61,13 @@ import com.google.common.collect.Maps; * 2. add all the job servers and query servers to the kylin.server.cluster-servers */ public class DistributedScheduler implements Scheduler<AbstractExecutable> { - private static final Logger logger = LoggerFactory.getLogger(DistributedScheduler.class); - public static final String ZOOKEEPER_LOCK_PATH = "/job_engine/lock"; // note ZookeeperDistributedLock will ensure zk path prefix: /${kylin.env.zookeeper-base-path}/metadata - - public static DistributedScheduler getInstance(KylinConfig config) { - return config.getManager(DistributedScheduler.class); - } - - // called by reflection - static DistributedScheduler newInstance(KylinConfig config) throws IOException { - return new DistributedScheduler(); - } + private static final Logger logger = LoggerFactory.getLogger(DistributedScheduler.class); + //keep all running job + private final Set<String> jobWithLocks = new CopyOnWriteArraySet<>(); + private ExecutableManager executableManager; // ============================================================================ - - private ExecutableManager executableManager; private FetcherRunner fetcher; private ScheduledExecutorService fetcherPool; private ExecutorService watchPool; @@ -83,109 +75,34 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable> { private DefaultContext context; private DistributedLock jobLock; private Closeable lockWatch; - - //keep all running job - private final Set<String> jobWithLocks = new CopyOnWriteArraySet<>(); private volatile boolean initialized = false; private volatile boolean hasStarted = false; private JobEngineConfig jobEngineConfig; private String serverName; - private class JobRunner implements Runnable { - - private final AbstractExecutable executable; - - public JobRunner(AbstractExecutable executable) { - this.executable = executable; - } - - @Override - public void run() { - try (SetThreadName ignored = new SetThreadName("Scheduler %s Job %s", - System.identityHashCode(DistributedScheduler.this), executable.getId())) { - - KylinConfig config = executable.getCubeSpecificConfig(); - boolean isAssigned = config.isOnAssignedServer(ToolUtil.getHostName(), - ToolUtil.getFirstIPV4NonLoopBackAddress().getHostAddress()); - logger.debug("cube = " + executable.getCubeName() + "; jobId=" + executable.getId() - + (isAssigned ? " is " : " is not ") + "assigned on this server : " + ToolUtil.getHostName()); - if (isAssigned && jobLock.lock(getLockPath(executable.getId()))) { - logger.info(executable.toString() + " scheduled in server: " + serverName); - - context.addRunningJob(executable); - jobWithLocks.add(executable.getId()); - executable.execute(context); - } - } catch (ExecuteException e) { - logger.error("ExecuteException job:" + executable.getId() + " in server: " + serverName, e); - } catch (Exception e) { - logger.error("unknown error execute job:" + executable.getId() + " in server: " + serverName, e); - } finally { - context.removeRunningJob(executable); - releaseJobLock(executable); - // trigger the next step asap - fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS); - } - } - - //release job lock when job state is ready or running and the job server keep the cube lock. - private void releaseJobLock(AbstractExecutable executable) { - if (executable instanceof DefaultChainedExecutable) { - ExecutableState state = executable.getStatus(); - - if (state != ExecutableState.READY && state != ExecutableState.RUNNING) { - if (jobWithLocks.contains(executable.getId())) { - logger.info( - executable.toString() + " will release the lock for the job: " + executable.getId()); - jobLock.unlock(getLockPath(executable.getId())); - jobWithLocks.remove(executable.getId()); - } - } - } - } + public static DistributedScheduler getInstance(KylinConfig config) { + return config.getManager(DistributedScheduler.class); } - //when the job lock released but the related job still running, resume the job. - private class WatcherProcessImpl implements DistributedLock.Watcher { - private String serverName; - - public WatcherProcessImpl(String serverName) { - this.serverName = serverName; - } - - @Override - public void onUnlock(String path, String nodeData) { - String[] paths = StringUtil.split(path, "/"); - String jobId = paths[paths.length - 1]; + // called by reflection + static DistributedScheduler newInstance(KylinConfig config) throws IOException { + return new DistributedScheduler(); + } - // Sync execute cache in case broadcast not available - try { - executableManager.syncDigestsOfJob(jobId); - } catch (PersistentException e) { - logger.error("Failed to sync cache of job: " + jobId + ", at server: " + serverName); - } + public static String getLockPath(String pathName) { + return dropDoubleSlash(ZOOKEEPER_LOCK_PATH + "/" + pathName); + } - final Output output = executableManager.getOutput(jobId); - if (output.getState() == ExecutableState.RUNNING) { - AbstractExecutable executable = executableManager.getJob(jobId); - if (executable instanceof DefaultChainedExecutable && !nodeData.equalsIgnoreCase(serverName)) { - try { - logger.warn(nodeData + " has released the lock for: " + jobId - + " but the job still running. so " + serverName + " resume the job"); - if (!jobLock.isLocked(getLockPath(jobId))) { - executableManager.resumeRunningJobForce(executable.getId()); - fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS); - } - } catch (Exception e) { - logger.error("resume the job but fail in server: " + serverName, e); - } - } - } - } + private static String getWatchPath() { + return dropDoubleSlash(ZOOKEEPER_LOCK_PATH); + } - @Override - public void onLock(String lockPath, String client) { + public static String dropDoubleSlash(String path) { + for (int n = Integer.MAX_VALUE; n > path.length();) { + n = path.length(); + path = path.replace("//", "/"); } + return path; } @Override @@ -255,22 +172,6 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable> { } } - public static String getLockPath(String pathName) { - return dropDoubleSlash(ZOOKEEPER_LOCK_PATH + "/" + pathName); - } - - private static String getWatchPath() { - return dropDoubleSlash(ZOOKEEPER_LOCK_PATH); - } - - public static String dropDoubleSlash(String path) { - for (int n = Integer.MAX_VALUE; n > path.length();) { - n = path.length(); - path = path.replace("//", "/"); - } - return path; - } - @Override public void shutdown() throws SchedulerException { logger.info("Will shut down Job Engine ...."); @@ -301,4 +202,106 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable> { public boolean hasStarted() { return this.hasStarted; } + + private class JobRunner implements Runnable { + + private final AbstractExecutable executable; + + public JobRunner(AbstractExecutable executable) { + this.executable = executable; + } + + @Override + public void run() { + try (SetThreadName ignored = new SetThreadName("Scheduler %s Job %s", + System.identityHashCode(DistributedScheduler.this), executable.getId())) { + + boolean isAssigned = true; + if (!StringUtils.isEmpty(executable.getCubeName())) { + KylinConfig config = executable.getCubeSpecificConfig(); + isAssigned = config.isOnAssignedServer(ToolUtil.getHostName(), + ToolUtil.getFirstIPV4NonLoopBackAddress().getHostAddress()); + logger.debug("cube = " + executable.getCubeName() + "; jobId=" + executable.getId() + + (isAssigned ? " is " : " is not ") + "assigned on this server : " + + ToolUtil.getHostName()); + } + + if (isAssigned && jobLock.lock(getLockPath(executable.getId()))) { + logger.info(executable.toString() + " scheduled in server: " + serverName); + + context.addRunningJob(executable); + jobWithLocks.add(executable.getId()); + executable.execute(context); + } + } catch (ExecuteException e) { + logger.error("ExecuteException job:" + executable.getId() + " in server: " + serverName, e); + } catch (Exception e) { + logger.error("unknown error execute job:" + executable.getId() + " in server: " + serverName, e); + } finally { + context.removeRunningJob(executable); + releaseJobLock(executable); + // trigger the next step asap + fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS); + } + } + + //release job lock when job state is ready or running and the job server keep the cube lock. + private void releaseJobLock(AbstractExecutable executable) { + if (executable instanceof DefaultChainedExecutable) { + ExecutableState state = executable.getStatus(); + + if (state != ExecutableState.READY && state != ExecutableState.RUNNING) { + if (jobWithLocks.contains(executable.getId())) { + logger.info( + executable.toString() + " will release the lock for the job: " + executable.getId()); + jobLock.unlock(getLockPath(executable.getId())); + jobWithLocks.remove(executable.getId()); + } + } + } + } + } + + //when the job lock released but the related job still running, resume the job. + private class WatcherProcessImpl implements DistributedLock.Watcher { + private String serverName; + + public WatcherProcessImpl(String serverName) { + this.serverName = serverName; + } + + @Override + public void onUnlock(String path, String nodeData) { + String[] paths = StringUtil.split(path, "/"); + String jobId = paths[paths.length - 1]; + + // Sync execute cache in case broadcast not available + try { + executableManager.syncDigestsOfJob(jobId); + } catch (PersistentException e) { + logger.error("Failed to sync cache of job: " + jobId + ", at server: " + serverName); + } + + final Output output = executableManager.getOutput(jobId); + if (output.getState() == ExecutableState.RUNNING) { + AbstractExecutable executable = executableManager.getJob(jobId); + if (executable instanceof DefaultChainedExecutable && !nodeData.equalsIgnoreCase(serverName)) { + try { + logger.warn(nodeData + " has released the lock for: " + jobId + + " but the job still running. so " + serverName + " resume the job"); + if (!jobLock.isLocked(getLockPath(jobId))) { + executableManager.resumeRunningJobForce(executable.getId()); + fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS); + } + } catch (Exception e) { + logger.error("resume the job but fail in server: " + serverName, e); + } + } + } + } + + @Override + public void onLock(String lockPath, String client) { + } + } }