KYLIN-2006 Make job engine distributed and HA Signed-off-by: Yang Li <liy...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/7fe43179 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/7fe43179 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/7fe43179 Branch: refs/heads/KYLIN-2006 Commit: 7fe43179b1ee8ff98fa56d258470de8ade81253f Parents: f0804f9 Author: kangkaisen <kangkai...@live.com> Authored: Mon Sep 5 20:15:23 2016 +0800 Committer: Yang Li <liy...@apache.org> Committed: Tue Nov 8 23:23:34 2016 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 1 + .../kylin/job/execution/ExecutableManager.java | 23 ++ .../impl/threadpool/DistributedScheduler.java | 349 +++++++++++++++++++ .../kylin/job/lock/DistributedJobLock.java | 29 ++ .../org/apache/kylin/job/lock/DoWatchLock.java | 23 ++ .../kylin/job/BaseTestDistributedScheduler.java | 226 ++++++++++++ .../apache/kylin/job/ContextTestExecutable.java | 51 +++ .../job/ITDistributedSchedulerBaseTest.java | 90 +++++ .../job/ITDistributedSchedulerTakeOverTest.java | 60 ++++ .../kylin/rest/controller/JobController.java | 62 +--- .../apache/kylin/rest/service/CubeService.java | 4 + .../apache/kylin/rest/service/JobService.java | 96 ++++- .../hbase/util/ZookeeperDistributedJobLock.java | 230 ++++++++++++ 13 files changed, 1182 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/7fe43179/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 6d3e807..ee9f57c 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -755,6 +755,7 @@ abstract public class KylinConfigBase implements Serializable { public Map<Integer, String> getSchedulers() { Map<Integer, String> r = convertKeyToInteger(getPropertiesByPrefix("kylin.scheduler.")); r.put(0, "org.apache.kylin.job.impl.threadpool.DefaultScheduler"); + r.put(2, "org.apache.kylin.job.impl.threadpool.DistributedScheduler"); return r; } http://git-wip-us.apache.org/repos/asf/kylin/blob/7fe43179/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java index 0901443..92fc8c9 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java @@ -235,11 +235,30 @@ public class ExecutableManager { } } + public void resumeRunningJobForce(String jobId) { + AbstractExecutable job = getJob(jobId); + if (job == null) { + return; + } + + if (job instanceof DefaultChainedExecutable) { + List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks(); + for (AbstractExecutable task : tasks) { + if (task.getStatus() == ExecutableState.RUNNING) { + updateJobOutput(task.getId(), ExecutableState.READY, null, null); + break; + } + } + } + updateJobOutput(jobId, ExecutableState.READY, null, null); + } + public void resumeJob(String jobId) { AbstractExecutable job = getJob(jobId); if (job == null) { return; } + if (job instanceof DefaultChainedExecutable) { List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks(); for (AbstractExecutable task : tasks) { @@ -254,6 +273,10 @@ public class ExecutableManager { public void discardJob(String jobId) { AbstractExecutable job = getJob(jobId); + if (job == null) { + return; + } + if (job instanceof DefaultChainedExecutable) { List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks(); for (AbstractExecutable task : tasks) { http://git-wip-us.apache.org/repos/asf/kylin/blob/7fe43179/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java new file mode 100644 index 0000000..11709c7 --- /dev/null +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java @@ -0,0 +1,349 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job.impl.threadpool; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.job.Scheduler; +import org.apache.kylin.job.constant.ExecutableConstants; +import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.exception.SchedulerException; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.apache.kylin.job.execution.Executable; +import org.apache.kylin.job.execution.ExecutableState; +import org.apache.kylin.job.execution.Output; +import org.apache.kylin.job.lock.DistributedJobLock; +import org.apache.kylin.job.lock.DoWatchLock; +import org.apache.kylin.job.lock.JobLock; +import org.apache.kylin.job.manager.ExecutableManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Maps; + +/** + * schedule the cubing jobs when several job server running with the same metadata. + * + * to enable the distributed job server, you need to set and update three configs in the kylin.properties: + * 1. kylin.enable.scheduler=2 + * 2. kylin.job.controller.lock=org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock + * 3. add all the job servers and query servers to the kylin.rest.servers + */ +public class DistributedScheduler implements Scheduler<AbstractExecutable>, ConnectionStateListener { + private ExecutableManager executableManager; + private FetcherRunner fetcher; + private ScheduledExecutorService fetcherPool; + private ExecutorService watchPool; + private ExecutorService jobPool; + private DefaultContext context; + private DistributedJobLock jobLock; + + private static final Logger logger = LoggerFactory.getLogger(DistributedScheduler.class); + private static final ConcurrentHashMap<KylinConfig, DistributedScheduler> CACHE = new ConcurrentHashMap<KylinConfig, DistributedScheduler>(); + //keep all segments having running job + private final Set<String> segmentWithLocks = new CopyOnWriteArraySet<>(); + private volatile boolean initialized = false; + private volatile boolean hasStarted = false; + private JobEngineConfig jobEngineConfig; + + private final static String SEGMENT_ID = "segmentId"; + + //only for it test + public static DistributedScheduler getInstance(KylinConfig config) { + DistributedScheduler r = CACHE.get(config); + if (r == null) { + synchronized (DistributedScheduler.class) { + r = CACHE.get(config); + if (r == null) { + r = new DistributedScheduler(); + CACHE.put(config, r); + if (CACHE.size() > 1) { + logger.warn("More than one singleton exist"); + } + } + } + } + return r; + } + + private class FetcherRunner implements Runnable { + @Override + synchronized public void run() { + try { + Map<String, Executable> runningJobs = context.getRunningJobs(); + if (runningJobs.size() >= jobEngineConfig.getMaxConcurrentJobLimit()) { + logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time"); + return; + } + + int nRunning = 0, nOtherRunning = 0, nReady = 0, nOthers = 0; + for (final String id : executableManager.getAllJobIds()) { + if (runningJobs.containsKey(id)) { + nRunning++; + continue; + } + + final Output output = executableManager.getOutput(id); + + if ((output.getState() != ExecutableState.READY)) { + if (output.getState() == ExecutableState.RUNNING) { + nOtherRunning++; + } else { + nOthers++; + } + continue; + } + + nReady++; + final AbstractExecutable executable = executableManager.getJob(id); + try { + jobPool.execute(new JobRunner(executable)); + } catch (Exception ex) { + logger.warn(executable.toString() + " fail to schedule in server: " + serverName, ex); + } + } + logger.info("Job Fetcher: " + nRunning + " should running, " + runningJobs.size() + " actual running, " + nOtherRunning + " running in other server, " + nReady + " ready, " + nOthers + " others"); + } catch (Exception e) { + logger.warn("Job Fetcher caught a exception " + e); + } + } + } + + private String serverName = getServerName(); + + private String getServerName() { + String serverName = null; + try { + serverName = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + logger.error("fail to get the serverName"); + } + return serverName; + } + + //only for it test + public void setServerName(String serverName) { + this.serverName = serverName; + logger.info("serverName update to:" + this.serverName); + } + + private class JobRunner implements Runnable { + + private final AbstractExecutable executable; + + public JobRunner(AbstractExecutable executable) { + this.executable = executable; + } + + @Override + public void run() { + try { + String segmentId = executable.getParam(SEGMENT_ID); + if (jobLock.lockWithName(segmentId, serverName)) { + logger.info(executable.toString() + " scheduled in server: " + serverName); + + context.addRunningJob(executable); + segmentWithLocks.add(segmentId); + executable.execute(context); + } + } catch (ExecuteException e) { + logger.error("ExecuteException job:" + executable.getId() + " in server: " + serverName, e); + } catch (Exception e) { + logger.error("unknown error execute job:" + executable.getId() + " in server: " + serverName, e); + } finally { + context.removeRunningJob(executable); + releaseJobLock(executable); + // trigger the next step asap + fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS); + } + } + + //release job lock only when the all tasks of the job finish and the job server keep the cube lock. + private void releaseJobLock(AbstractExecutable executable) { + if (executable instanceof DefaultChainedExecutable) { + String segmentId = executable.getParam(SEGMENT_ID); + ExecutableState state = executable.getStatus(); + + if (state == ExecutableState.SUCCEED || state == ExecutableState.ERROR || state == ExecutableState.DISCARDED) { + if (segmentWithLocks.contains(segmentId)) { + logger.info(executable.toString() + " will release the lock for the segment: " + segmentId); + jobLock.unlockWithName(segmentId); + segmentWithLocks.remove(segmentId); + } + } + } + } + } + + //when the segment lock released but the segment related job still running, resume the job. + private class DoWatchImpl implements DoWatchLock { + private String serverName; + + public DoWatchImpl(String serverName) { + this.serverName = serverName; + } + + @Override + public void doWatch(String path, String nodeData) { + String[] paths = path.split("/"); + String segmentId = paths[paths.length - 1]; + + for (final String id : executableManager.getAllJobIds()) { + final Output output = executableManager.getOutput(id); + if (output.getState() == ExecutableState.RUNNING) { + AbstractExecutable executable = executableManager.getJob(id); + if (executable instanceof DefaultChainedExecutable && executable.getParams().get(SEGMENT_ID).equalsIgnoreCase(segmentId) && !nodeData.equalsIgnoreCase(serverName)) { + try { + logger.warn(nodeData + " has released the lock for: " + segmentId + " but the job still running. so " + serverName + " resume the job"); + if (jobLock.lockWithName(segmentId, serverName)) { + executableManager.resumeRunningJobForce(executable.getId()); + fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS); + break; + } + } catch (Exception e) { + logger.error("resume the job but fail in server: " + serverName, e); + } + } + } + } + } + + } + + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) { + if ((newState == ConnectionState.SUSPENDED) || (newState == ConnectionState.LOST)) { + try { + shutdown(); + } catch (SchedulerException e) { + throw new RuntimeException("failed to shutdown scheduler", e); + } + } + } + + @Override + public synchronized void init(JobEngineConfig jobEngineConfig, final JobLock jobLock) throws SchedulerException { + String serverMode = jobEngineConfig.getConfig().getServerMode(); + if (!("job".equals(serverMode.toLowerCase()) || "all".equals(serverMode.toLowerCase()))) { + logger.info("server mode: " + serverMode + ", no need to run job scheduler"); + return; + } + logger.info("Initializing Job Engine ...."); + + if (!initialized) { + initialized = true; + } else { + return; + } + + this.jobEngineConfig = jobEngineConfig; + this.jobLock = (DistributedJobLock) jobLock; + + executableManager = ExecutableManager.getInstance(jobEngineConfig.getConfig()); + //load all executable, set them to a consistent status + fetcherPool = Executors.newScheduledThreadPool(1); + + //watch the zookeeper node change, so that when one job server is down, other job servers can take over. + watchPool = Executors.newFixedThreadPool(1); + DoWatchImpl doWatchImpl = new DoWatchImpl(this.serverName); + this.jobLock.watchLock(watchPool, doWatchImpl); + + int corePoolSize = jobEngineConfig.getMaxConcurrentJobLimit(); + jobPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, Long.MAX_VALUE, TimeUnit.DAYS, new SynchronousQueue<Runnable>()); + context = new DefaultContext(Maps.<String, Executable> newConcurrentMap(), jobEngineConfig.getConfig()); + + resumeAllRunningJobs(); + + fetcher = new FetcherRunner(); + fetcherPool.scheduleAtFixedRate(fetcher, 10, ExecutableConstants.DEFAULT_SCHEDULER_INTERVAL_SECONDS, TimeUnit.SECONDS); + hasStarted = true; + } + + private void resumeAllRunningJobs() { + for (final String id : executableManager.getAllJobIds()) { + final Output output = executableManager.getOutput(id); + AbstractExecutable executable = executableManager.getJob(id); + if (output.getState() == ExecutableState.RUNNING && executable instanceof DefaultChainedExecutable) { + try { + if (jobLock.lockWithName(executable.getParam(SEGMENT_ID), serverName)) { + executableManager.resumeRunningJobForce(executable.getId()); + fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS); + } + } catch (Exception e) { + logger.error("resume the job " + id + " fail in server: " + serverName, e); + } + } + } + } + + @Override + public void shutdown() throws SchedulerException { + logger.info("Will shut down Job Engine ...."); + + releaseAllLocks(); + logger.info("The all locks has released"); + + watchPool.shutdown(); + logger.info("The watchPool has down"); + + fetcherPool.shutdown(); + logger.info("The fetcherPool has down"); + + jobPool.shutdown(); + logger.info("The jobPoll has down"); + } + + private void releaseAllLocks() { + for (String segmentId : segmentWithLocks) { + jobLock.unlockWithName(segmentId); + } + } + + @Override + public boolean stop(AbstractExecutable executable) throws SchedulerException { + if (hasStarted) { + return true; + } else { + //TODO should try to stop this executable + return true; + } + } + + @Override + public boolean hasStarted() { + return this.hasStarted; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/7fe43179/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java new file mode 100644 index 0000000..5ba8426 --- /dev/null +++ b/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.job.lock; + +import java.util.concurrent.ExecutorService; + +public interface DistributedJobLock extends JobLock { + boolean lockWithName(String cubeName, String serverName); + + void unlockWithName(String name); + + void watchLock(ExecutorService pool, DoWatchLock doWatch); +} http://git-wip-us.apache.org/repos/asf/kylin/blob/7fe43179/core-job/src/main/java/org/apache/kylin/job/lock/DoWatchLock.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/DoWatchLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/DoWatchLock.java new file mode 100644 index 0000000..08c13f9 --- /dev/null +++ b/core-job/src/main/java/org/apache/kylin/job/lock/DoWatchLock.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job.lock; + +public interface DoWatchLock { + void doWatch(String path, String data); +} http://git-wip-us.apache.org/repos/asf/kylin/blob/7fe43179/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java new file mode 100644 index 0000000..c33f3da --- /dev/null +++ b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import org.apache.commons.lang.StringUtils; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.ExecutableState; +import org.apache.kylin.job.impl.threadpool.DistributedScheduler; +import org.apache.kylin.job.manager.ExecutableManager; +import org.apache.kylin.storage.hbase.HBaseConnection; +import org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.File; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.util.Arrays; + +public class BaseTestDistributedScheduler { + static ExecutableManager jobService; + static ZookeeperDistributedJobLock jobLock; + static DistributedScheduler scheduler1; + static DistributedScheduler scheduler2; + static KylinConfig kylinConfig1; + static KylinConfig kylinConfig2; + static CuratorFramework zkClient; + + static final String SEGMENT_ID = "segmentId"; + static final String segmentId1 = "segmentId1"; + static final String segmentId2 = "segmentId2"; + static final String serverName1 = "serverName1"; + static final String serverName2 = "serverName2"; + static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock"; + static final String confSrcPath = "../examples/test_case_data/sandbox/kylin.properties"; + static final String confDstPath = "../examples/kylin.properties"; + static final String SANDBOX_TEST_DATA = "../examples/test_case_data/sandbox"; + + private static final Logger logger = LoggerFactory.getLogger(BaseTestDistributedScheduler.class); + + static { + try { + ClassUtil.addClasspath(new File(SANDBOX_TEST_DATA).getAbsolutePath()); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @BeforeClass + public static void setup() throws Exception { + staticCreateTestMetadata(SANDBOX_TEST_DATA); + System.setProperty("kylin.job.controller.lock", "org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock"); + + initZk(); + + kylinConfig1 = KylinConfig.getInstanceFromEnv(); + jobService = ExecutableManager.getInstance(kylinConfig1); + for (String jobId : jobService.getAllJobIds()) { + jobService.deleteJob(jobId); + } + + jobLock = new ZookeeperDistributedJobLock(); + scheduler1 = DistributedScheduler.getInstance(kylinConfig1); + scheduler1.setServerName(serverName1); + scheduler1.init(new JobEngineConfig(kylinConfig1), jobLock); + if (!scheduler1.hasStarted()) { + throw new RuntimeException("scheduler1 not started"); + } + + String absoluteConfSrcPath = new File(confSrcPath).getAbsolutePath(); + String absoluteConfDstPath = new File(confDstPath).getAbsolutePath(); + copyFile(absoluteConfSrcPath, absoluteConfDstPath); + kylinConfig2 = KylinConfig.createInstanceFromUri(absoluteConfDstPath); + + scheduler2 = DistributedScheduler.getInstance(kylinConfig2); + scheduler2.setServerName(serverName2); + scheduler2.init(new JobEngineConfig(kylinConfig2), jobLock); + if (!scheduler2.hasStarted()) { + throw new RuntimeException("scheduler2 not started"); + } + + Thread.sleep(10000); + } + + @AfterClass + public static void after() throws Exception { + System.clearProperty(KylinConfig.KYLIN_CONF); + System.clearProperty("kylin.job.controller.lock"); + + deleteFile(confDstPath); + } + + private static void staticCreateTestMetadata(String kylinConfigFolder) { + KylinConfig.destroyInstance(); + + if (System.getProperty(KylinConfig.KYLIN_CONF) == null && System.getenv(KylinConfig.KYLIN_CONF) == null) + System.setProperty(KylinConfig.KYLIN_CONF, kylinConfigFolder); + } + + void waitForJobFinish(String jobId) { + while (true) { + AbstractExecutable job = jobService.getJob(jobId); + final ExecutableState status = job.getStatus(); + if (status == ExecutableState.SUCCEED || status == ExecutableState.ERROR || status == ExecutableState.STOPPED || status == ExecutableState.DISCARDED) { + break; + } else { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } + + void waitForJobStatus(String jobId, ExecutableState state, long interval) { + while (true) { + AbstractExecutable job = jobService.getJob(jobId); + if (state == job.getStatus()) { + break; + } else { + try { + Thread.sleep(interval); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } + + boolean lock(ZookeeperDistributedJobLock jobLock, String cubeName, String serverName) { + return jobLock.lockWithName(cubeName, serverName); + } + + private static void initZk() { + String zkConnectString = getZKConnectString(); + if (StringUtils.isEmpty(zkConnectString)) { + throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!"); + } + RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); + zkClient = CuratorFrameworkFactory.newClient(zkConnectString, retryPolicy); + zkClient.start(); + } + + private static String getZKConnectString() { + Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); + final String serverList = conf.get(HConstants.ZOOKEEPER_QUORUM); + final String port = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT); + return org.apache.commons.lang3.StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() { + @Nullable + @Override + public String apply(String input) { + return input + ":" + port; + } + }), ","); + } + + String getServerName(String cubeName) { + String lockPath = getLockPath(cubeName); + String serverName = null; + if (zkClient.getState().equals(CuratorFrameworkState.STARTED)) { + try { + if (zkClient.checkExists().forPath(lockPath) != null) { + byte[] data = zkClient.getData().forPath(lockPath); + serverName = new String(data, Charset.forName("UTF-8")); + } + } catch (Exception e) { + logger.error("get the serverName failed", e); + } + } + return serverName; + } + + private String getLockPath(String pathName) { + return ZOOKEEPER_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix() + "/" + pathName; + } + + private static void copyFile(String srcPath, String dstPath) { + try { + File srcFile = new File(srcPath); + File dstFile = new File(dstPath); + Files.copy(srcFile.toPath(), dstFile.toPath()); + } catch (Exception e) { + logger.error("copy the file failed", e); + } + } + + private static void deleteFile(String path) { + try { + Files.delete(new File(path).toPath()); + } catch (Exception e) { + logger.error("delete the file failed", e); + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/7fe43179/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java b/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java new file mode 100644 index 0000000..052baad --- /dev/null +++ b/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; +import org.apache.kylin.job.impl.threadpool.DefaultContext; + +public class ContextTestExecutable extends AbstractExecutable { + public ContextTestExecutable() { + super(); + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + + DefaultContext defaultContext = (DefaultContext) context; + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + if (getHashCode(defaultContext.getConfig()) == getHashCode(KylinConfig.getInstanceFromEnv())) { + return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); + } else { + return new ExecuteResult(ExecuteResult.State.ERROR, "error"); + } + } + + private int getHashCode(KylinConfig config) { + return System.identityHashCode(config); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/7fe43179/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java b/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java new file mode 100644 index 0000000..443e73b --- /dev/null +++ b/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job; + +import org.apache.kylin.job.exception.JobException; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.apache.kylin.job.execution.ExecutableState; +import org.junit.Assert; +import org.junit.Test; + +public class ITDistributedSchedulerBaseTest extends BaseTestDistributedScheduler { + @Test + public void testSchedulerLock() throws Exception { + if (!lock(jobLock, segmentId1, serverName1)) { + throw new JobException("fail to get the lock"); + } + DefaultChainedExecutable job = new DefaultChainedExecutable(); + job.setParam(SEGMENT_ID, segmentId1); + AbstractExecutable task1 = new SucceedTestExecutable(); + task1.setParam(SEGMENT_ID, segmentId1); + AbstractExecutable task2 = new SucceedTestExecutable(); + task2.setParam(SEGMENT_ID, segmentId1); + AbstractExecutable task3 = new SucceedTestExecutable(); + task3.setParam(SEGMENT_ID, segmentId1); + job.addTask(task1); + job.addTask(task2); + job.addTask(task3); + jobService.addJob(job); + + Assert.assertEquals(serverName1, getServerName(segmentId1)); + + waitForJobFinish(job.getId()); + + Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState()); + Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task2.getId()).getState()); + Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task3.getId()).getState()); + Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState()); + + Assert.assertEquals(null, getServerName(segmentId1)); + } + + @Test + public void testSchedulerConsistent() throws Exception { + if (!lock(jobLock, segmentId2, serverName1)) { + throw new JobException("fail to get the lock"); + } + DefaultChainedExecutable job = new DefaultChainedExecutable(); + job.setParam(SEGMENT_ID, segmentId2); + ContextTestExecutable task1 = new ContextTestExecutable(); + task1.setParam(SEGMENT_ID, segmentId2); + job.addTask(task1); + jobService.addJob(job); + + waitForJobFinish(job.getId()); + Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState()); + Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState()); + + if (!lock(jobLock, segmentId2, serverName2)) { + throw new JobException("fail to get the lock"); + } + + DefaultChainedExecutable job2 = new DefaultChainedExecutable(); + job2.setParam(SEGMENT_ID, segmentId2); + ContextTestExecutable task2 = new ContextTestExecutable(); + task2.setParam(SEGMENT_ID, segmentId2); + job2.addTask(task2); + jobService.addJob(job2); + + waitForJobFinish(job2.getId()); + Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(task2.getId()).getState()); + Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(job2.getId()).getState()); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/7fe43179/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerTakeOverTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerTakeOverTest.java b/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerTakeOverTest.java new file mode 100644 index 0000000..3137aef --- /dev/null +++ b/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerTakeOverTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job; + +import org.apache.kylin.job.exception.JobException; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.apache.kylin.job.execution.ExecutableState; +import org.junit.Assert; +import org.junit.Test; + +public class ITDistributedSchedulerTakeOverTest extends BaseTestDistributedScheduler { + @Test + public void testSchedulerTakeOver() throws Exception { + if (!lock(jobLock, segmentId2, serverName1)) { + throw new JobException("fail to get the lock"); + } + + DefaultChainedExecutable job = new DefaultChainedExecutable(); + job.setParam(SEGMENT_ID, segmentId2); + AbstractExecutable task1 = new SucceedTestExecutable(); + task1.setParam(SEGMENT_ID, segmentId2); + AbstractExecutable task2 = new SucceedTestExecutable(); + task2.setParam(SEGMENT_ID, segmentId2); + AbstractExecutable task3 = new SucceedTestExecutable(); + task3.setParam(SEGMENT_ID, segmentId2); + job.addTask(task1); + job.addTask(task2); + job.addTask(task3); + jobService.addJob(job); + + waitForJobStatus(job.getId(), ExecutableState.RUNNING, 500); + + scheduler1.shutdown(); + scheduler1 = null; + + waitForJobFinish(job.getId()); + + Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState()); + Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task2.getId()).getState()); + Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task3.getId()).getState()); + Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState()); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/7fe43179/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java index 7022bfc..16b643c 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java @@ -24,25 +24,15 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.TimeZone; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.job.JobInstance; -import org.apache.kylin.job.Scheduler; -import org.apache.kylin.job.SchedulerFactory; import org.apache.kylin.job.constant.JobStatusEnum; import org.apache.kylin.job.constant.JobTimeFilterEnum; -import org.apache.kylin.job.engine.JobEngineConfig; -import org.apache.kylin.job.exception.SchedulerException; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.lock.JobLock; import org.apache.kylin.rest.exception.InternalErrorException; import org.apache.kylin.rest.request.JobListRequest; import org.apache.kylin.rest.service.JobService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.PathVariable; @@ -50,64 +40,14 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.ResponseBody; -/** - * - */ @Controller @RequestMapping(value = "jobs") -public class JobController extends BasicController implements InitializingBean { +public class JobController extends BasicController { private static final Logger logger = LoggerFactory.getLogger(JobController.class); @Autowired private JobService jobService; - private JobLock jobLock; - - /* - * (non-Javadoc) - * - * @see - * org.springframework.beans.factory.InitializingBean#afterPropertiesSet() - */ - @SuppressWarnings("unchecked") - @Override - public void afterPropertiesSet() throws Exception { - - String timeZone = jobService.getConfig().getTimeZone(); - TimeZone tzone = TimeZone.getTimeZone(timeZone); - TimeZone.setDefault(tzone); - - final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - final Scheduler<AbstractExecutable> scheduler = (Scheduler<AbstractExecutable>) SchedulerFactory.scheduler(kylinConfig.getSchedulerType()); - - jobLock = (JobLock) ClassUtil.newInstance(kylinConfig.getJobControllerLock()); - - new Thread(new Runnable() { - @Override - public void run() { - try { - scheduler.init(new JobEngineConfig(kylinConfig), jobLock); - if (!scheduler.hasStarted()) { - logger.info("Job engine doesn't start in this node."); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }).start(); - - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - @Override - public void run() { - try { - scheduler.shutdown(); - } catch (SchedulerException e) { - logger.error("error occurred to shutdown scheduler", e); - } - } - })); - } - /** * get all cube jobs * http://git-wip-us.apache.org/repos/asf/kylin/blob/7fe43179/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java index 99e54b9..a6246f8 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java @@ -461,6 +461,8 @@ public class CubeService extends BasicService { } DefaultChainedExecutable job = new DefaultChainedExecutable(); + //make sure the job could be scheduled when the DistributedScheduler is enable. + job.setParam("segmentId", tableName); job.setName("Hive Column Cardinality calculation for table '" + tableName + "'"); job.setSubmitter(submitter); @@ -471,6 +473,7 @@ public class CubeService extends BasicService { step1.setMapReduceJobClass(HiveColumnCardinalityJob.class); step1.setMapReduceParams(param); + step1.setParam("segmentId", tableName); job.addTask(step1); @@ -478,6 +481,7 @@ public class CubeService extends BasicService { step2.setJobClass(HiveColumnCardinalityUpdateJob.class); step2.setJobParams(param); + step2.setParam("segmentId", tableName); job.addTask(step2); getExecutableManager().addJob(job); http://git-wip-us.apache.org/repos/asf/kylin/blob/7fe43179/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java index 49b9b9f..a6a9842 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -19,6 +19,8 @@ package org.apache.kylin.rest.service; import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.Calendar; import java.util.Collections; import java.util.Date; @@ -26,8 +28,11 @@ import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TimeZone; import org.apache.commons.lang3.StringUtils; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.CubeUpdate; @@ -38,15 +43,21 @@ import org.apache.kylin.engine.mr.common.HadoopShellExecutable; import org.apache.kylin.engine.mr.common.MapReduceExecutable; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.job.JobInstance; +import org.apache.kylin.job.Scheduler; +import org.apache.kylin.job.SchedulerFactory; import org.apache.kylin.job.common.ShellExecutable; import org.apache.kylin.job.constant.JobStatusEnum; import org.apache.kylin.job.constant.JobStepStatusEnum; import org.apache.kylin.job.constant.JobTimeFilterEnum; +import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.exception.JobException; +import org.apache.kylin.job.exception.SchedulerException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.execution.Output; +import org.apache.kylin.job.lock.DistributedJobLock; +import org.apache.kylin.job.lock.JobLock; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.realization.RealizationStatusEnum; import org.apache.kylin.rest.constant.Constant; @@ -56,7 +67,9 @@ import org.apache.kylin.source.SourceFactory; import org.apache.kylin.source.SourcePartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.EnableAspectJAutoProxy; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.stereotype.Component; @@ -69,15 +82,64 @@ import com.google.common.collect.Sets; /** * @author ysong1 */ + +@EnableAspectJAutoProxy(proxyTargetClass = true) @Component("jobService") -public class JobService extends BasicService { +public class JobService extends BasicService implements InitializingBean { @SuppressWarnings("unused") private static final Logger logger = LoggerFactory.getLogger(JobService.class); + private JobLock jobLock; + @Autowired private AccessService accessService; + /* + * (non-Javadoc) + * + * @see + * org.springframework.beans.factory.InitializingBean#afterPropertiesSet() + */ + @SuppressWarnings("unchecked") + @Override + public void afterPropertiesSet() throws Exception { + + String timeZone = getConfig().getTimeZone(); + TimeZone tzone = TimeZone.getTimeZone(timeZone); + TimeZone.setDefault(tzone); + + final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + final Scheduler<AbstractExecutable> scheduler = (Scheduler<AbstractExecutable>) SchedulerFactory.scheduler(kylinConfig.getSchedulerType()); + + jobLock = (JobLock) ClassUtil.newInstance(kylinConfig.getJobControllerLock()); + + new Thread(new Runnable() { + @Override + public void run() { + try { + scheduler.init(new JobEngineConfig(kylinConfig), jobLock); + if (!scheduler.hasStarted()) { + logger.info("scheduler has not been started"); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }).start(); + + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + @Override + public void run() { + try { + scheduler.shutdown(); + } catch (SchedulerException e) { + logger.error("error occurred to shutdown scheduler", e); + } + } + })); + } + public List<JobInstance> listAllJobs(final String cubeName, final String projectName, final List<JobStatusEnum> statusList, final Integer limitValue, final Integer offsetValue, final JobTimeFilterEnum timeFilter) throws IOException, JobException { Integer limit = (null == limitValue) ? 30 : limitValue; Integer offset = (null == offsetValue) ? 0 : offsetValue; @@ -215,12 +277,15 @@ public class JobService extends BasicService { SourcePartition sourcePartition = new SourcePartition(startDate, endDate, startOffset, endOffset, sourcePartitionOffsetStart, sourcePartitionOffsetEnd); sourcePartition = source.parsePartitionBeforeBuild(cube, sourcePartition); CubeSegment newSeg = getCubeManager().appendSegment(cube, sourcePartition); + lockSegment(newSeg.getUuid()); job = EngineFactory.createBatchCubingJob(newSeg, submitter); } else if (buildType == CubeBuildTypeEnum.MERGE) { CubeSegment newSeg = getCubeManager().mergeSegments(cube, startDate, endDate, startOffset, endOffset, force); + lockSegment(newSeg.getUuid()); job = EngineFactory.createBatchMergeJob(newSeg, submitter); } else if (buildType == CubeBuildTypeEnum.REFRESH) { CubeSegment refreshSeg = getCubeManager().refreshSegment(cube, startDate, endDate, startOffset, endOffset); + lockSegment(refreshSeg.getUuid()); job = EngineFactory.createBatchCubingJob(refreshSeg, submitter); } else { throw new JobException("invalid build type:" + buildType); @@ -363,6 +428,8 @@ public class JobService extends BasicService { @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#job, 'ADMINISTRATION') or hasPermission(#job, 'OPERATION') or hasPermission(#job, 'MANAGEMENT')") public void resumeJob(JobInstance job) throws IOException, JobException { + lockSegment(job.getRelatedSegment()); + getExecutableManager().resumeJob(job.getId()); } @@ -380,7 +447,34 @@ public class JobService extends BasicService { } } getExecutableManager().discardJob(job.getId()); + + //release the segment lock when discarded the job but the job hasn't scheduled + releaseSegmentLock(job.getRelatedSegment()); + return job; } + private void lockSegment(String segmentId) throws JobException { + if (jobLock instanceof DistributedJobLock) { + if (!((DistributedJobLock) jobLock).lockWithName(segmentId, getServerName())) { + throw new JobException("Fail to get the segment lock, the segment may be building in another job server"); + } + } + } + + private void releaseSegmentLock(String segmentId) { + if (jobLock instanceof DistributedJobLock) { + ((DistributedJobLock) jobLock).unlockWithName(segmentId); + } + } + + private String getServerName() { + String serverName = null; + try { + serverName = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + logger.error("fail to get the hostname"); + } + return serverName; + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/7fe43179/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java new file mode 100644 index 0000000..eba7a20 --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.storage.hbase.util; + +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.concurrent.ExecutorService; + +import javax.annotation.Nullable; + +import org.apache.commons.lang.StringUtils; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.job.lock.DistributedJobLock; +import org.apache.kylin.job.lock.DoWatchLock; +import org.apache.kylin.storage.hbase.HBaseConnection; +import org.apache.zookeeper.CreateMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; + +/** + * the jobLock is specially used to support distributed scheduler. + */ + +public class ZookeeperDistributedJobLock implements DistributedJobLock { + private static Logger logger = LoggerFactory.getLogger(ZookeeperDistributedJobLock.class); + + private static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock"; + private static CuratorFramework zkClient; + private static PathChildrenCache childrenCache; + + static { + String zkConnectString = getZKConnectString(); + logger.info("zk connection string:" + zkConnectString); + if (StringUtils.isEmpty(zkConnectString)) { + throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!"); + } + + RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); + zkClient = CuratorFrameworkFactory.newClient(zkConnectString, retryPolicy); + zkClient.start(); + + childrenCache = new PathChildrenCache(zkClient, getWatchPath(), true); + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + @Override + public void run() { + try { + childrenCache.close(); + zkClient.close(); + } catch (Exception e) { + logger.error("error occurred to close PathChildrenCache", e); + } + } + })); + } + + /** + * Lock the segment with the segmentId and serverName. + * + * <p> if the segment related job want to be scheduled, + * it must acquire the segment lock. segmentId is used to get the lock path, + * serverName marked which job server keep the segment lock. + * + * @param segmentId the id of segment need to lock + * + * @param serverName the hostname of job server + * + * @return <tt>true</tt> if the segment locked successfully + */ + + @Override + public boolean lockWithName(String segmentId, String serverName) { + String lockPath = getLockPath(segmentId); + logger.info(serverName + " start lock the segment: " + segmentId); + + boolean hasLock = false; + try { + if (!(zkClient.getState().equals(CuratorFrameworkState.STARTED))) { + logger.error("zookeeper have not start"); + return false; + } + if (zkClient.checkExists().forPath(lockPath) != null) { + if (hasLock(serverName, lockPath)) { + hasLock = true; + logger.info(serverName + " has kept the lock for segment: " + segmentId); + } + } else { + zkClient.create().withMode(CreateMode.EPHEMERAL).forPath(lockPath, serverName.getBytes(Charset.forName("UTF-8"))); + if (hasLock(serverName, lockPath)) { + hasLock = true; + logger.info(serverName + " lock the segment: " + segmentId + " successfully"); + } + } + } catch (Exception e) { + logger.error(serverName + " error acquire lock for the segment: " + segmentId, e); + } + if (!hasLock) { + logger.info(serverName + " fail to acquire lock for the segment: " + segmentId); + return false; + } + return true; + } + + private boolean hasLock(String serverName, String lockPath) { + String lockServerName = null; + try { + if (zkClient.checkExists().forPath(lockPath) != null) { + byte[] data = zkClient.getData().forPath(lockPath); + lockServerName = new String(data, Charset.forName("UTF-8")); + } + } catch (Exception e) { + logger.error("fail to get the serverName for the path: " + lockPath, e); + } + return lockServerName.equalsIgnoreCase(serverName); + } + + /** + * release the segment lock with the segmentId. + * + * <p> the segment related zookeeper node will be deleted. + * + * @param segmentId the name of segment need to release the lock + */ + + @Override + public void unlockWithName(String segmentId) { + String lockPath = getLockPath(segmentId); + try { + if (zkClient.getState().equals(CuratorFrameworkState.STARTED)) { + if (zkClient.checkExists().forPath(lockPath) != null) { + zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(lockPath); + logger.info("the lock for " + segmentId + " release successfully"); + } else { + logger.info("the lock for " + segmentId + " has released"); + } + } + } catch (Exception e) { + logger.error("error release lock :" + segmentId); + throw new RuntimeException(e); + } + } + + /** + * watching all the locked segments related zookeeper nodes change, + * in order to when one job server is down, other job server can take over the running jobs. + * + * @param pool the threadPool watching the zookeeper node change + * @param doWatch do the concrete action with the zookeeper node path and zookeeper node data + */ + + @Override + public void watchLock(ExecutorService pool, final DoWatchLock doWatch) { + try { + childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); + childrenCache.getListenable().addListener(new PathChildrenCacheListener() { + @Override + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { + switch (event.getType()) { + case CHILD_REMOVED: + doWatch.doWatch(event.getData().getPath(), new String(event.getData().getData(), Charset.forName("UTF-8"))); + break; + default: + break; + } + } + }, pool); + } catch (Exception e) { + logger.warn("watch the zookeeper node fail: " + e); + } + } + + private static String getZKConnectString() { + Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); + final String serverList = conf.get(HConstants.ZOOKEEPER_QUORUM); + final String port = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT); + return org.apache.commons.lang3.StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() { + @Nullable + @Override + public String apply(String input) { + return input + ":" + port; + } + }), ","); + } + + private String getLockPath(String pathName) { + return ZOOKEEPER_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix() + "/" + pathName; + } + + private static String getWatchPath() { + return ZOOKEEPER_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix(); + } + + @Override + public boolean lock() { + return true; + } + + @Override + public void unlock() { + + } +}