KYLIN-2506 Refactor ZookeeperDistributedJobLock
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/fd3b7655 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/fd3b7655 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/fd3b7655 Branch: refs/heads/KYLIN-2506 Commit: fd3b7655839af8dcd2370570ed9e426db410eb12 Parents: ce8b24f Author: kangkaisen <kangkai...@163.com> Authored: Fri Apr 7 15:45:43 2017 +0800 Committer: kangkaisen <kangkai...@163.com> Committed: Thu Apr 13 16:50:49 2017 +0800 ---------------------------------------------------------------------- core-common/pom.xml | 5 + .../kylin/common/lock/DistributedJobLock.java | 38 +++++ .../org/apache/kylin/common/lock/JobLock.java | 26 +++ .../apache/kylin/common/lock/MockJobLock.java | 33 ++++ .../model/validation/rule/DictionaryRule.java | 4 +- .../validation/rule/DictionaryRuleTest.java | 12 +- .../kylin/dict/AppendTrieDictionaryBuilder.java | 1 - .../java/org/apache/kylin/job/Scheduler.java | 2 +- .../job/impl/threadpool/DefaultScheduler.java | 2 +- .../impl/threadpool/DistributedScheduler.java | 43 +++-- .../kylin/job/lock/DistributedJobLock.java | 36 ---- .../java/org/apache/kylin/job/lock/JobLock.java | 27 --- .../org/apache/kylin/job/lock/MockJobLock.java | 33 ---- .../job/impl/threadpool/BaseSchedulerTest.java | 2 +- .../test_case_data/localmeta/kylin.properties | 4 +- .../test_case_data/sandbox/kylin.properties | 2 +- .../kylin/job/BaseTestDistributedScheduler.java | 4 +- .../apache/kylin/rest/service/JobService.java | 2 +- .../hbase/util/ZookeeperDistributedJobLock.java | 164 +++++++++---------- .../storage/hbase/util/ZookeeperJobLock.java | 2 +- 20 files changed, 225 insertions(+), 217 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/core-common/pom.xml ---------------------------------------------------------------------- diff --git a/core-common/pom.xml b/core-common/pom.xml index 95d3c29..5b5f78b 100644 --- a/core-common/pom.xml +++ b/core-common/pom.xml @@ -69,6 +69,11 @@ <scope>provided</scope> </dependency> <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-recipes</artifactId> + <scope>provided</scope> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/core-common/src/main/java/org/apache/kylin/common/lock/DistributedJobLock.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/lock/DistributedJobLock.java b/core-common/src/main/java/org/apache/kylin/common/lock/DistributedJobLock.java new file mode 100644 index 0000000..00d1ca4 --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/lock/DistributedJobLock.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.lock; + +import org.apache.curator.framework.recipes.cache.PathChildrenCache; + +import java.util.concurrent.Executor; + +public interface DistributedJobLock extends JobLock { + + boolean lockWithClient(String lockPath, String lockClient); + + boolean isHasLocked(String lockPath); + + void unlock(String lockPath); + + PathChildrenCache watch(String watchPath, Executor watchExecutor, WatcherProcess process); + + public interface WatcherProcess { + void process(String path, String data); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/core-common/src/main/java/org/apache/kylin/common/lock/JobLock.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/lock/JobLock.java b/core-common/src/main/java/org/apache/kylin/common/lock/JobLock.java new file mode 100644 index 0000000..5802d71 --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/lock/JobLock.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.lock; + + +public interface JobLock { + boolean lock(); + + void unlock(); +} http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/core-common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java b/core-common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java new file mode 100644 index 0000000..f8233be --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/lock/MockJobLock.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.lock; + +/** + */ +public class MockJobLock implements JobLock { + @Override + public boolean lock() { + return true; + } + + @Override + public void unlock() { + return; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java index 8da3ca0..4392e5a 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java @@ -29,7 +29,6 @@ import org.apache.kylin.cube.model.DictionaryDesc; import org.apache.kylin.cube.model.validation.IValidatorRule; import org.apache.kylin.cube.model.validation.ResultLevel; import org.apache.kylin.cube.model.validation.ValidateContext; -import org.apache.kylin.dict.GlobalDictionaryBuilder; import org.apache.kylin.metadata.model.TblColRef; /** @@ -47,6 +46,7 @@ public class DictionaryRule implements IValidatorRule<CubeDesc> { static final String ERROR_REUSE_BUILDER_BOTH_EMPTY = "REUSE and BUILDER both empty on dictionary for column: "; static final String ERROR_TRANSITIVE_REUSE = "Transitive REUSE is not allowed for dictionary: "; static final String ERROR_GLOBAL_DICTIONNARY_ONLY_MEASURE = "Global dictionary couldn't be used for dimension column: "; + static final String GLOBAL_DICT_BUILDER_CLASS = "org.apache.kylin.dict.GlobalDictionaryBuilder"; @Override public void validate(CubeDesc cubeDesc, ValidateContext context) { @@ -82,7 +82,7 @@ public class DictionaryRule implements IValidatorRule<CubeDesc> { return; } - if (StringUtils.isNotEmpty(builderClass) && builderClass.equalsIgnoreCase(GlobalDictionaryBuilder.class.getName()) && dimensionColumns.contains(dictCol)) { + if (StringUtils.isNotEmpty(builderClass) && builderClass.equalsIgnoreCase(GLOBAL_DICT_BUILDER_CLASS) && dimensionColumns.contains(dictCol)) { context.addResult(ResultLevel.ERROR, ERROR_GLOBAL_DICTIONNARY_ONLY_MEASURE + dictCol); return; } http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java b/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java index fcb723e..dc90a69 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java @@ -35,7 +35,6 @@ import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.DictionaryDesc; import org.apache.kylin.cube.model.validation.ValidateContext; -import org.apache.kylin.dict.GlobalDictionaryBuilder; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -73,7 +72,7 @@ public class DictionaryRuleTest extends LocalFileMetadataTestCase { @Test public void testBadDesc() throws IOException { testDictionaryDesc(ERROR_DUPLICATE_DICTIONARY_COLUMN, DictionaryDesc.create("ORDER_ID", null, "FakeBuilderClass")); - testDictionaryDesc(ERROR_DUPLICATE_DICTIONARY_COLUMN, DictionaryDesc.create("ORDER_ID", null, GlobalDictionaryBuilder.class.getName())); + testDictionaryDesc(ERROR_DUPLICATE_DICTIONARY_COLUMN, DictionaryDesc.create("ORDER_ID", null, DictionaryRule.GLOBAL_DICT_BUILDER_CLASS)); } @Test @@ -88,20 +87,17 @@ public class DictionaryRuleTest extends LocalFileMetadataTestCase { @Test public void testBadDesc4() throws IOException { - testDictionaryDesc(ERROR_TRANSITIVE_REUSE, - DictionaryDesc.create("lstg_site_id", "SELLER_ID", null), - DictionaryDesc.create("price", "lstg_site_id", null)); + testDictionaryDesc(ERROR_TRANSITIVE_REUSE, DictionaryDesc.create("lstg_site_id", "SELLER_ID", null), DictionaryDesc.create("price", "lstg_site_id", null)); } @Test public void testBadDesc5() throws IOException { - testDictionaryDesc(ERROR_GLOBAL_DICTIONNARY_ONLY_MEASURE, - DictionaryDesc.create("CATEG_LVL2_NAME", null, GlobalDictionaryBuilder.class.getName())); + testDictionaryDesc(ERROR_GLOBAL_DICTIONNARY_ONLY_MEASURE, DictionaryDesc.create("CATEG_LVL2_NAME", null, DictionaryRule.GLOBAL_DICT_BUILDER_CLASS)); } @Test public void testGoodDesc2() throws IOException { - testDictionaryDesc(null, DictionaryDesc.create("SELLER_ID", null, GlobalDictionaryBuilder.class.getName())); + testDictionaryDesc(null, DictionaryDesc.create("SELLER_ID", null, DictionaryRule.GLOBAL_DICT_BUILDER_CLASS)); } private void testDictionaryDesc(String expectMessage, DictionaryDesc... descs) throws IOException { http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java index bfd664f..c35a815 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java @@ -18,7 +18,6 @@ package org.apache.kylin.dict; -import com.google.common.base.Preconditions; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.BytesUtil; import org.slf4j.Logger; http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/core-job/src/main/java/org/apache/kylin/job/Scheduler.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/Scheduler.java b/core-job/src/main/java/org/apache/kylin/job/Scheduler.java index 93d2510..e2cfd44 100644 --- a/core-job/src/main/java/org/apache/kylin/job/Scheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/Scheduler.java @@ -21,7 +21,7 @@ package org.apache.kylin.job; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.exception.SchedulerException; import org.apache.kylin.job.execution.Executable; -import org.apache.kylin.job.lock.JobLock; +import org.apache.kylin.common.lock.JobLock; /** */ http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java index 403abc4..688708e 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java @@ -40,7 +40,7 @@ import org.apache.kylin.job.execution.Executable; import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.execution.Output; -import org.apache.kylin.job.lock.JobLock; +import org.apache.kylin.common.lock.JobLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java index 1f2e958..b99da7c 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java @@ -18,6 +18,7 @@ package org.apache.kylin.job.impl.threadpool; +import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Map; @@ -33,6 +34,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.kylin.common.KylinConfig; @@ -48,8 +50,8 @@ import org.apache.kylin.job.execution.Executable; import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.execution.Output; -import org.apache.kylin.job.lock.DistributedJobLock; -import org.apache.kylin.job.lock.JobLock; +import org.apache.kylin.common.lock.DistributedJobLock; +import org.apache.kylin.common.lock.JobLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,6 +73,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn private ExecutorService jobPool; private DefaultContext context; private DistributedJobLock jobLock; + private PathChildrenCache lockWatch; private static final Logger logger = LoggerFactory.getLogger(DistributedScheduler.class); private static final ConcurrentMap<KylinConfig, DistributedScheduler> CACHE = new ConcurrentHashMap<KylinConfig, DistributedScheduler>(); @@ -81,6 +84,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn private JobEngineConfig jobEngineConfig; private final static String SEGMENT_ID = "segmentId"; + public static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock"; //only for it test public static DistributedScheduler getInstance(KylinConfig config) { @@ -177,7 +181,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn public void run() { try (SetThreadName ignored = new SetThreadName("Job %s", executable.getId())) { String segmentId = executable.getParam(SEGMENT_ID); - if (jobLock.lockWithName(segmentId, serverName)) { + if (jobLock.lockWithClient(getLockPath(segmentId), serverName)) { logger.info(executable.toString() + " scheduled in server: " + serverName); context.addRunningJob(executable); @@ -205,7 +209,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn if (state != ExecutableState.READY && state != ExecutableState.RUNNING) { if (segmentWithLocks.contains(segmentId)) { logger.info(executable.toString() + " will release the lock for the segment: " + segmentId); - jobLock.unlockWithName(segmentId); + jobLock.unlock(getLockPath(segmentId)); segmentWithLocks.remove(segmentId); } } @@ -214,15 +218,15 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn } //when the segment lock released but the segment related job still running, resume the job. - private class DoWatchImpl implements org.apache.kylin.job.lock.DistributedJobLock.DoWatchLock { + private class WatcherProcessImpl implements org.apache.kylin.common.lock.DistributedJobLock.WatcherProcess { private String serverName; - public DoWatchImpl(String serverName) { + public WatcherProcessImpl(String serverName) { this.serverName = serverName; } @Override - public void doWatch(String path, String nodeData) { + public void process(String path, String nodeData) { String[] paths = path.split("/"); String segmentId = paths[paths.length - 1]; @@ -233,7 +237,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn if (executable instanceof DefaultChainedExecutable && executable.getParams().get(SEGMENT_ID).equalsIgnoreCase(segmentId) && !nodeData.equalsIgnoreCase(serverName)) { try { logger.warn(nodeData + " has released the lock for: " + segmentId + " but the job still running. so " + serverName + " resume the job"); - if (!jobLock.isHasLocked(segmentId)) { + if (!jobLock.isHasLocked(getLockPath(segmentId))) { executableManager.resumeRunningJobForce(executable.getId()); fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS); break; @@ -283,8 +287,8 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn //watch the zookeeper node change, so that when one job server is down, other job servers can take over. watchPool = Executors.newFixedThreadPool(1); - DoWatchImpl doWatchImpl = new DoWatchImpl(this.serverName); - this.jobLock.watchLock(watchPool, doWatchImpl); + WatcherProcessImpl watcherProcess = new WatcherProcessImpl(this.serverName); + lockWatch = this.jobLock.watch(getWatchPath(), watchPool, watcherProcess); int corePoolSize = jobEngineConfig.getMaxConcurrentJobLimit(); jobPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, Long.MAX_VALUE, TimeUnit.DAYS, new SynchronousQueue<Runnable>()); @@ -314,16 +318,27 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn } } + private String getLockPath(String pathName) { + return ZOOKEEPER_LOCK_PATH + "/" + jobEngineConfig.getConfig().getMetadataUrlPrefix() + "/" + pathName; + } + + private String getWatchPath() { + return ZOOKEEPER_LOCK_PATH + "/" + jobEngineConfig.getConfig().getMetadataUrlPrefix(); + } + @Override public void shutdown() throws SchedulerException { logger.info("Will shut down Job Engine ...."); + try { + lockWatch.close(); + } catch (IOException e) { + throw new SchedulerException(e); + } + releaseAllLocks(); logger.info("The all locks has released"); - watchPool.shutdown(); - logger.info("The watchPool has down"); - fetcherPool.shutdown(); logger.info("The fetcherPool has down"); @@ -333,7 +348,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn private void releaseAllLocks() { for (String segmentId : segmentWithLocks) { - jobLock.unlockWithName(segmentId); + jobLock.unlock(getLockPath(segmentId)); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java deleted file mode 100644 index 1c173ec..0000000 --- a/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.lock; - -import java.util.concurrent.ExecutorService; - -public interface DistributedJobLock extends JobLock { - - boolean lockWithName(String name, String serverName); - - boolean isHasLocked(String segmentId); - - void unlockWithName(String name); - - void watchLock(ExecutorService pool, DoWatchLock doWatch); - - public interface DoWatchLock { - void doWatch(String path, String data); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java deleted file mode 100644 index bbfb801..0000000 --- a/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.lock; - -/** - */ -public interface JobLock { - boolean lock(); - - void unlock(); -} http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java deleted file mode 100644 index cac17b9..0000000 --- a/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.lock; - -/** - */ -public class MockJobLock implements JobLock { - @Override - public boolean lock() { - return true; - } - - @Override - public void unlock() { - return; - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java ---------------------------------------------------------------------- diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java index 1ada9a1..1bafa34 100644 --- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java +++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java @@ -28,7 +28,7 @@ import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; -import org.apache.kylin.job.lock.MockJobLock; +import org.apache.kylin.common.lock.MockJobLock; import org.junit.After; import org.junit.Before; import org.slf4j.Logger; http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/examples/test_case_data/localmeta/kylin.properties ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/kylin.properties b/examples/test_case_data/localmeta/kylin.properties index 9f7b24c..3866575 100644 --- a/examples/test_case_data/localmeta/kylin.properties +++ b/examples/test_case_data/localmeta/kylin.properties @@ -133,7 +133,7 @@ kylin.security.saml.context-path=/kylin kylin.test.bcc.new.key=some-value kylin.engine.mr.config-override.test1=test1 kylin.engine.mr.config-override.test2=test2 -kylin.job.lock=org.apache.kylin.job.lock.MockJobLock +kylin.job.lock=org.apache.kylin.common.lock.MockJobLock kylin.engine.provider.0=org.apache.kylin.engine.mr.MRBatchCubingEngine -kylin.engine.provider.2=org.apache.kylin.engine.mr.MRBatchCubingEngine2 +kylin.engine.provider.2=org.apache.kylin.engine.mr.MRBatchCubingEngine2 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/examples/test_case_data/sandbox/kylin.properties ---------------------------------------------------------------------- diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties index 684b4dd..c0a4968 100644 --- a/examples/test_case_data/sandbox/kylin.properties +++ b/examples/test_case_data/sandbox/kylin.properties @@ -112,7 +112,7 @@ kylin.query.udf.concat=org.apache.kylin.query.udf.ConcatUDF kylin.query.udf.version=org.apache.kylin.query.udf.VersionUDF # for test -kylin.job.lock=org.apache.kylin.job.lock.MockJobLock +kylin.job.lock=org.apache.kylin.common.lock.MockJobLock kylin.engine.mr.uhc-reducer-count=3 ### CUBE ### http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java index 2d79970..4877ca1 100644 --- a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java +++ b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java @@ -167,7 +167,7 @@ public class BaseTestDistributedScheduler extends HBaseMetadataTestCase { } boolean lock(ZookeeperDistributedJobLock jobLock, String cubeName, String serverName) { - return jobLock.lockWithName(cubeName, serverName); + return jobLock.lockWithClient(getLockPath(cubeName), serverName); } private static void initZk() { @@ -197,6 +197,6 @@ public class BaseTestDistributedScheduler extends HBaseMetadataTestCase { } private String getLockPath(String pathName) { - return ZookeeperDistributedJobLock.ZOOKEEPER_LOCK_PATH + "/" + kylinConfig1.getMetadataUrlPrefix() + "/" + pathName; + return DistributedScheduler.ZOOKEEPER_LOCK_PATH + "/" + kylinConfig1.getMetadataUrlPrefix() + "/" + pathName; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java index 4ba426e..31d1ded 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -51,7 +51,7 @@ import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.execution.Output; -import org.apache.kylin.job.lock.JobLock; +import org.apache.kylin.common.lock.JobLock; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.realization.RealizationStatusEnum; import org.apache.kylin.rest.constant.Constant; http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java index 983bfd9..5f5a721 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java @@ -19,35 +19,30 @@ package org.apache.kylin.storage.hbase.util; import java.nio.charset.Charset; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; import org.apache.commons.lang3.StringUtils; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.job.lock.DistributedJobLock; +import org.apache.kylin.common.lock.DistributedJobLock; import org.apache.zookeeper.CreateMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * the jobLock is specially used to support distributed scheduler. - */ - public class ZookeeperDistributedJobLock implements DistributedJobLock { private static Logger logger = LoggerFactory.getLogger(ZookeeperDistributedJobLock.class); - public static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock"; - - final private KylinConfig config; - final CuratorFramework zkClient; - final PathChildrenCache childrenCache; + private final KylinConfig config; + private static final ConcurrentMap<KylinConfig, CuratorFramework> CACHE = new ConcurrentHashMap<KylinConfig, CuratorFramework>(); + private final CuratorFramework zkClient; public ZookeeperDistributedJobLock() { this(KylinConfig.getInstanceFromEnv()); @@ -57,16 +52,12 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock { this.config = config; String zkConnectString = ZookeeperUtil.getZKConnectString(); - logger.info("zk connection string:" + zkConnectString); if (StringUtils.isEmpty(zkConnectString)) { throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!"); } - RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); - zkClient = CuratorFrameworkFactory.newClient(zkConnectString, retryPolicy); - zkClient.start(); + zkClient = getZKClient(config, zkConnectString); - childrenCache = new PathChildrenCache(zkClient, getWatchPath(), true); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { @@ -75,97 +66,104 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock { })); } + //make the zkClient to be singleton + private static CuratorFramework getZKClient(KylinConfig config, String zkConnectString) { + CuratorFramework zkClient = CACHE.get(config); + if (zkClient == null) { + synchronized (ZookeeperDistributedJobLock.class) { + zkClient = CACHE.get(config); + if (zkClient == null) { + RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); + zkClient = CuratorFrameworkFactory.newClient(zkConnectString, 120000, 15000, retryPolicy); + zkClient.start(); + CACHE.put(config, zkClient); + if (CACHE.size() > 1) { + logger.warn("More than one singleton exist"); + } + } + } + } + return zkClient; + } + /** - * Lock the segment with the segmentId and serverName. - * - * <p> if the segment related job want to be scheduled, - * it must acquire the segment lock. segmentId is used to get the lock path, - * serverName marked which job server keep the segment lock. + * Try locking the path with the lockPath and lockClient, if lock successfully, + * the lockClient will write into the data of lockPath. * - * @param segmentId the id of segment need to lock + * @param lockPath the path will create in zookeeper * - * @param serverName the hostname of job server + * @param lockClient the mark of client * - * @return <tt>true</tt> if the segment locked successfully + * @return <tt>true</tt> if lock successfully or the lockClient has kept the lock * * @since 2.0 */ @Override - public boolean lockWithName(String segmentId, String serverName) { - String lockPath = getLockPath(segmentId); - logger.info(serverName + " start lock the segment: " + segmentId); + public boolean lockWithClient(String lockPath, String lockClient) { + logger.info(lockClient + " start lock the path: " + lockPath); boolean hasLock = false; try { - if (!(zkClient.getState().equals(CuratorFrameworkState.STARTED))) { - logger.error("zookeeper have not start"); - return false; - } if (zkClient.checkExists().forPath(lockPath) != null) { - if (isKeepLock(serverName, lockPath)) { + if (isKeepLock(lockClient, lockPath)) { hasLock = true; - logger.info(serverName + " has kept the lock for segment: " + segmentId); + logger.info(lockClient + " has kept the lock for the path: " + lockPath); } } else { - zkClient.create().withMode(CreateMode.EPHEMERAL).forPath(lockPath, serverName.getBytes(Charset.forName("UTF-8"))); - if (isKeepLock(serverName, lockPath)) { + zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(lockPath, lockClient.getBytes(Charset.forName("UTF-8"))); + if (isKeepLock(lockClient, lockPath)) { hasLock = true; - logger.info(serverName + " lock the segment: " + segmentId + " successfully"); + logger.info(lockClient + " lock the path: " + lockPath + " successfully"); } } } catch (Exception e) { - logger.error(serverName + " error acquire lock for the segment: " + segmentId, e); - } - if (!hasLock) { - logger.info(serverName + " fail to acquire lock for the segment: " + segmentId); - return false; + logger.error(lockClient + " error acquire lock for the path: " + lockPath, e); } - return true; + return hasLock; } /** * - * Returns <tt>true</tt> if, the job server is keeping the lock for the lockPath + * Returns <tt>true</tt> if, the lockClient is keeping the lock for the lockPath * - * @param serverName the hostname of job server + * @param lockClient the mark of client * - * @param lockPath the zookeeper node path of segment + * @param lockPath the zookeeper node path for the lock * - * @return <tt>true</tt> if the job server is keeping the lock for the lockPath, otherwise + * @return <tt>true</tt> if the lockClient is keeping the lock for the lockPath, otherwise * <tt>false</tt> * * @since 2.0 */ - private boolean isKeepLock(String serverName, String lockPath) { + private boolean isKeepLock(String lockClient, String lockPath) { try { if (zkClient.checkExists().forPath(lockPath) != null) { byte[] data = zkClient.getData().forPath(lockPath); String lockServerName = new String(data, Charset.forName("UTF-8")); - return lockServerName.equalsIgnoreCase(serverName); + return lockServerName.equalsIgnoreCase(lockClient); } } catch (Exception e) { - logger.error("fail to get the serverName for the path: " + lockPath, e); + logger.error("fail to get the lockClient for the path: " + lockPath, e); } return false; } /** * - * Returns <tt>true</tt> if, and only if, the segment has been locked. + * Returns <tt>true</tt> if, and only if, the path has been locked. * - * @param segmentId the id of segment need to release the lock. + * @param lockPath the zookeeper node path for the lock * - * @return <tt>true</tt> if the segment has been locked, otherwise + * @return <tt>true</tt> if the path has been locked, otherwise * <tt>false</tt> * * @since 2.0 */ @Override - public boolean isHasLocked(String segmentId) { - String lockPath = getLockPath(segmentId); + public boolean isHasLocked(String lockPath) { try { return zkClient.checkExists().forPath(lockPath) != null; } catch (Exception e) { @@ -175,71 +173,66 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock { } /** - * release the segment lock with the segmentId. + * release the lock with the specific path. * - * <p> the segment related zookeeper node will be deleted. + * <p> the path related zookeeper node will be deleted. * - * @param segmentId the id of segment need to release the lock + * @param lockPath the zookeeper node path for the lock. * * @since 2.0 */ @Override - public void unlockWithName(String segmentId) { - String lockPath = getLockPath(segmentId); + public void unlock(String lockPath) { try { - if (zkClient.getState().equals(CuratorFrameworkState.STARTED)) { - if (zkClient.checkExists().forPath(lockPath) != null) { - zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(lockPath); - logger.info("the lock for " + segmentId + " release successfully"); - } else { - logger.info("the lock for " + segmentId + " has released"); - } + if (zkClient.checkExists().forPath(lockPath) != null) { + zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(lockPath); + logger.info("the lock for " + lockPath + " release successfully"); + } else { + logger.info("the lock for " + lockPath + " has released"); } } catch (Exception e) { - logger.error("error release lock :" + segmentId); + logger.error("error release lock :" + lockPath); throw new RuntimeException(e); } } /** - * watching all the locked segments related zookeeper nodes change, - * in order to when one job server is down, other job server can take over the running jobs. + * watch the path so that when zookeeper node change, the client could receive the notification. + * Note: the client should close the PathChildrenCache in time. + * + * @param watchPath the path need to watch * - * @param pool the threadPool watching the zookeeper node change + * @param watchExecutor the executor watching the zookeeper node change * - * @param doWatch do the concrete action with the zookeeper node path and zookeeper node data + * @param watcherProcess do the concrete action with the node path and node data when zookeeper node changed + * + * @return PathChildrenCache the client should close the PathChildrenCache in time * * @since 2.0 */ @Override - public void watchLock(ExecutorService pool, final DoWatchLock doWatch) { + public PathChildrenCache watch(String watchPath, Executor watchExecutor, final WatcherProcess watcherProcess) { + PathChildrenCache cache = new PathChildrenCache(zkClient, watchPath, true); try { - childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); - childrenCache.getListenable().addListener(new PathChildrenCacheListener() { + cache.start(); + cache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { switch (event.getType()) { case CHILD_REMOVED: - doWatch.doWatch(event.getData().getPath(), new String(event.getData().getData(), Charset.forName("UTF-8"))); + watcherProcess.process(event.getData().getPath(), new String(event.getData().getData(), Charset.forName("UTF-8"))); break; default: break; } } - }, pool); + }, watchExecutor); } catch (Exception e) { logger.warn("watch the zookeeper node fail: " + e); } - } - - private String getLockPath(String pathName) { - return ZOOKEEPER_LOCK_PATH + "/" + config.getMetadataUrlPrefix() + "/" + pathName; - } - - private String getWatchPath() { - return ZOOKEEPER_LOCK_PATH + "/" + config.getMetadataUrlPrefix(); + return cache; } @Override @@ -253,7 +246,6 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock { public void close() { try { - childrenCache.close(); zkClient.close(); } catch (Exception e) { logger.error("error occurred to close PathChildrenCache", e); http://git-wip-us.apache.org/repos/asf/kylin/blob/fd3b7655/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java index 7bf7498..7315d1d 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java @@ -35,7 +35,7 @@ import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.job.lock.JobLock; +import org.apache.kylin.common.lock.JobLock; import org.apache.kylin.storage.hbase.HBaseConnection; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger;