KYLIN-2006 fix test case
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/aea46d7c Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/aea46d7c Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/aea46d7c Branch: refs/heads/KYLIN-2006 Commit: aea46d7ce2d3b6de1a3b5d79bf9715b3fbfaa680 Parents: 3aad93a Author: Yang Li <liy...@apache.org> Authored: Tue Nov 8 22:30:23 2016 +0800 Committer: Yang Li <liy...@apache.org> Committed: Wed Nov 9 00:14:29 2016 +0800 ---------------------------------------------------------------------- .../common/util/AbstractKylinTestCase.java | 4 +- .../kylin/job/execution/AbstractExecutable.java | 4 + .../kylin/job/execution/ExecutableManager.java | 4 + .../impl/threadpool/DistributedScheduler.java | 6 +- .../kylin/job/BaseTestDistributedScheduler.java | 121 +++++++++---------- .../apache/kylin/job/ContextTestExecutable.java | 9 +- .../job/ITDistributedSchedulerBaseTest.java | 22 ++-- .../job/ITDistributedSchedulerTakeOverTest.java | 10 +- .../hbase/util/ZookeeperDistributedJobLock.java | 42 ++++--- 9 files changed, 115 insertions(+), 107 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/aea46d7c/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java ---------------------------------------------------------------------- diff --git a/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java b/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java index 14bf90b..2154c32 100644 --- a/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java +++ b/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java @@ -34,7 +34,9 @@ public abstract class AbstractKylinTestCase { "org.apache.kylin.storage.hybrid.HybridManager", // "org.apache.kylin.metadata.realization.RealizationRegistry", // "org.apache.kylin.metadata.project.ProjectManager", // - "org.apache.kylin.metadata.MetadataManager" // + "org.apache.kylin.metadata.MetadataManager", // + "org.apache.kylin.job.impl.threadpool.DistributedScheduler", // + "org.apache.kylin.job.manager.ExecutableManager", // }; public abstract void createTestMetadata() throws Exception; http://git-wip-us.apache.org/repos/asf/kylin/blob/aea46d7c/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java index 2a4b2df..9292418 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java @@ -66,6 +66,10 @@ public abstract class AbstractExecutable implements Executable, Idempotent { this.config = config; } + protected KylinConfig getConfig() { + return config; + } + protected ExecutableManager getManager() { return ExecutableManager.getInstance(config); } http://git-wip-us.apache.org/repos/asf/kylin/blob/aea46d7c/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java index 92fc8c9..1db612f 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java @@ -65,6 +65,10 @@ public class ExecutableManager { return r; } + public static void clearCache() { + CACHE.clear(); + } + private ExecutableManager(KylinConfig config) { logger.info("Using metadata url: " + config); this.config = config; http://git-wip-us.apache.org/repos/asf/kylin/blob/aea46d7c/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java index 17df119..3937a24 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java @@ -43,11 +43,11 @@ import org.apache.kylin.job.exception.SchedulerException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.job.execution.Executable; +import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.execution.Output; import org.apache.kylin.job.lock.DistributedJobLock; import org.apache.kylin.job.lock.JobLock; -import org.apache.kylin.job.manager.ExecutableManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,6 +98,10 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn return r; } + public static void clearCache() { + CACHE.clear(); + } + private class FetcherRunner implements Runnable { @Override synchronized public void run() { http://git-wip-us.apache.org/repos/asf/kylin/blob/aea46d7c/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java index c33f3da..910db49 100644 --- a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java +++ b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java @@ -18,9 +18,13 @@ package org.apache.kylin.job; -import com.google.common.base.Function; -import com.google.common.collect.Iterables; -import org.apache.commons.lang.StringUtils; +import java.io.File; +import java.nio.charset.Charset; +import java.util.Arrays; + +import javax.annotation.Nullable; + +import org.apache.commons.lang3.StringUtils; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -29,12 +33,12 @@ import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.common.util.HBaseMetadataTestCase; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.impl.threadpool.DistributedScheduler; -import org.apache.kylin.job.manager.ExecutableManager; import org.apache.kylin.storage.hbase.HBaseConnection; import org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock; import org.junit.AfterClass; @@ -42,14 +46,11 @@ import org.junit.BeforeClass; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; -import java.io.File; -import java.nio.charset.Charset; -import java.nio.file.Files; -import java.util.Arrays; +import com.google.common.base.Function; +import com.google.common.collect.Iterables; -public class BaseTestDistributedScheduler { - static ExecutableManager jobService; +public class BaseTestDistributedScheduler extends HBaseMetadataTestCase { + static ExecutableManager execMgr; static ZookeeperDistributedJobLock jobLock; static DistributedScheduler scheduler1; static DistributedScheduler scheduler2; @@ -62,35 +63,38 @@ public class BaseTestDistributedScheduler { static final String segmentId2 = "segmentId2"; static final String serverName1 = "serverName1"; static final String serverName2 = "serverName2"; - static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock"; static final String confSrcPath = "../examples/test_case_data/sandbox/kylin.properties"; - static final String confDstPath = "../examples/kylin.properties"; - static final String SANDBOX_TEST_DATA = "../examples/test_case_data/sandbox"; + static final String confDstPath1 = "target/kylin_metadata_dist_lock_test1/kylin.properties"; + static final String confDstPath2 = "target/kylin_metadata_dist_lock_test2/kylin.properties"; private static final Logger logger = LoggerFactory.getLogger(BaseTestDistributedScheduler.class); - static { - try { - ClassUtil.addClasspath(new File(SANDBOX_TEST_DATA).getAbsolutePath()); - } catch (Exception e) { - e.printStackTrace(); - } - } - @BeforeClass public static void setup() throws Exception { - staticCreateTestMetadata(SANDBOX_TEST_DATA); + staticCreateTestMetadata(); System.setProperty("kylin.job.controller.lock", "org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock"); + new File(confDstPath1).getParentFile().mkdirs(); + new File(confDstPath2).getParentFile().mkdirs(); + KylinConfig srcConfig = KylinConfig.getInstanceFromEnv(); + String backup = srcConfig.getMetadataUrl(); + srcConfig.setProperty("kylin.metadata.url", "kylin_metadata_dist_lock_test@hbase"); + srcConfig.writeProperties(new File(confDstPath1)); + srcConfig.writeProperties(new File(confDstPath2)); + srcConfig.setProperty("kylin.metadata.url", backup); + kylinConfig1 = KylinConfig.createInstanceFromUri(new File(confDstPath1).getAbsolutePath()); + kylinConfig2 = KylinConfig.createInstanceFromUri(new File(confDstPath2).getAbsolutePath()); + initZk(); - kylinConfig1 = KylinConfig.getInstanceFromEnv(); - jobService = ExecutableManager.getInstance(kylinConfig1); - for (String jobId : jobService.getAllJobIds()) { - jobService.deleteJob(jobId); + if (jobLock == null) + jobLock = new ZookeeperDistributedJobLock(kylinConfig1); + + execMgr = ExecutableManager.getInstance(kylinConfig1); + for (String jobId : execMgr.getAllJobIds()) { + execMgr.deleteJob(jobId); } - jobLock = new ZookeeperDistributedJobLock(); scheduler1 = DistributedScheduler.getInstance(kylinConfig1); scheduler1.setServerName(serverName1); scheduler1.init(new JobEngineConfig(kylinConfig1), jobLock); @@ -98,11 +102,6 @@ public class BaseTestDistributedScheduler { throw new RuntimeException("scheduler1 not started"); } - String absoluteConfSrcPath = new File(confSrcPath).getAbsolutePath(); - String absoluteConfDstPath = new File(confDstPath).getAbsolutePath(); - copyFile(absoluteConfSrcPath, absoluteConfDstPath); - kylinConfig2 = KylinConfig.createInstanceFromUri(absoluteConfDstPath); - scheduler2 = DistributedScheduler.getInstance(kylinConfig2); scheduler2.setServerName(serverName2); scheduler2.init(new JobEngineConfig(kylinConfig2), jobLock); @@ -115,22 +114,30 @@ public class BaseTestDistributedScheduler { @AfterClass public static void after() throws Exception { - System.clearProperty(KylinConfig.KYLIN_CONF); + if (scheduler1 != null) { + scheduler1.shutdown(); + scheduler1 = null; + } + if (scheduler2 != null) { + scheduler2.shutdown(); + scheduler2 = null; + } + if (jobLock != null) { + jobLock.close(); + jobLock = null; + } + if (zkClient != null) { + zkClient.close(); + zkClient = null; + } + System.clearProperty("kylin.job.controller.lock"); - - deleteFile(confDstPath); - } - - private static void staticCreateTestMetadata(String kylinConfigFolder) { - KylinConfig.destroyInstance(); - - if (System.getProperty(KylinConfig.KYLIN_CONF) == null && System.getenv(KylinConfig.KYLIN_CONF) == null) - System.setProperty(KylinConfig.KYLIN_CONF, kylinConfigFolder); + staticCleanupTestMetadata(); } void waitForJobFinish(String jobId) { while (true) { - AbstractExecutable job = jobService.getJob(jobId); + AbstractExecutable job = execMgr.getJob(jobId); final ExecutableState status = job.getStatus(); if (status == ExecutableState.SUCCEED || status == ExecutableState.ERROR || status == ExecutableState.STOPPED || status == ExecutableState.DISCARDED) { break; @@ -146,7 +153,7 @@ public class BaseTestDistributedScheduler { void waitForJobStatus(String jobId, ExecutableState state, long interval) { while (true) { - AbstractExecutable job = jobService.getJob(jobId); + AbstractExecutable job = execMgr.getJob(jobId); if (state == job.getStatus()) { break; } else { @@ -177,7 +184,7 @@ public class BaseTestDistributedScheduler { Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); final String serverList = conf.get(HConstants.ZOOKEEPER_QUORUM); final String port = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT); - return org.apache.commons.lang3.StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() { + return StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() { @Nullable @Override public String apply(String input) { @@ -203,24 +210,6 @@ public class BaseTestDistributedScheduler { } private String getLockPath(String pathName) { - return ZOOKEEPER_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix() + "/" + pathName; - } - - private static void copyFile(String srcPath, String dstPath) { - try { - File srcFile = new File(srcPath); - File dstFile = new File(dstPath); - Files.copy(srcFile.toPath(), dstFile.toPath()); - } catch (Exception e) { - logger.error("copy the file failed", e); - } - } - - private static void deleteFile(String path) { - try { - Files.delete(new File(path).toPath()); - } catch (Exception e) { - logger.error("delete the file failed", e); - } + return ZookeeperDistributedJobLock.ZOOKEEPER_LOCK_PATH + "/" + kylinConfig1.getMetadataUrlPrefix() + "/" + pathName; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/aea46d7c/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java b/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java index 052baad..4696e67 100644 --- a/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java +++ b/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java @@ -18,12 +18,10 @@ package org.apache.kylin.job; -import org.apache.kylin.common.KylinConfig; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; import org.apache.kylin.job.execution.ExecuteResult; -import org.apache.kylin.job.impl.threadpool.DefaultContext; public class ContextTestExecutable extends AbstractExecutable { public ContextTestExecutable() { @@ -33,19 +31,14 @@ public class ContextTestExecutable extends AbstractExecutable { @Override protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { - DefaultContext defaultContext = (DefaultContext) context; try { Thread.sleep(1000); } catch (InterruptedException e) { } - if (getHashCode(defaultContext.getConfig()) == getHashCode(KylinConfig.getInstanceFromEnv())) { + if (context.getConfig() == BaseTestDistributedScheduler.kylinConfig1) { return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); } else { return new ExecuteResult(ExecuteResult.State.ERROR, "error"); } } - - private int getHashCode(KylinConfig config) { - return System.identityHashCode(config); - } } http://git-wip-us.apache.org/repos/asf/kylin/blob/aea46d7c/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java b/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java index 443e73b..0d5e011 100644 --- a/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java @@ -42,16 +42,16 @@ public class ITDistributedSchedulerBaseTest extends BaseTestDistributedScheduler job.addTask(task1); job.addTask(task2); job.addTask(task3); - jobService.addJob(job); + execMgr.addJob(job); Assert.assertEquals(serverName1, getServerName(segmentId1)); waitForJobFinish(job.getId()); - Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState()); - Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task2.getId()).getState()); - Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task3.getId()).getState()); - Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState()); + Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task1.getId()).getState()); + Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task2.getId()).getState()); + Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task3.getId()).getState()); + Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(job.getId()).getState()); Assert.assertEquals(null, getServerName(segmentId1)); } @@ -66,11 +66,11 @@ public class ITDistributedSchedulerBaseTest extends BaseTestDistributedScheduler ContextTestExecutable task1 = new ContextTestExecutable(); task1.setParam(SEGMENT_ID, segmentId2); job.addTask(task1); - jobService.addJob(job); + execMgr.addJob(job); waitForJobFinish(job.getId()); - Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState()); - Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState()); + Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task1.getId()).getState()); + Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(job.getId()).getState()); if (!lock(jobLock, segmentId2, serverName2)) { throw new JobException("fail to get the lock"); @@ -81,10 +81,10 @@ public class ITDistributedSchedulerBaseTest extends BaseTestDistributedScheduler ContextTestExecutable task2 = new ContextTestExecutable(); task2.setParam(SEGMENT_ID, segmentId2); job2.addTask(task2); - jobService.addJob(job2); + execMgr.addJob(job2); waitForJobFinish(job2.getId()); - Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(task2.getId()).getState()); - Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(job2.getId()).getState()); + Assert.assertEquals(ExecutableState.ERROR, execMgr.getOutput(task2.getId()).getState()); + Assert.assertEquals(ExecutableState.ERROR, execMgr.getOutput(job2.getId()).getState()); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/aea46d7c/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerTakeOverTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerTakeOverTest.java b/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerTakeOverTest.java index 3137aef..2b15ddd 100644 --- a/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerTakeOverTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerTakeOverTest.java @@ -43,7 +43,7 @@ public class ITDistributedSchedulerTakeOverTest extends BaseTestDistributedSched job.addTask(task1); job.addTask(task2); job.addTask(task3); - jobService.addJob(job); + execMgr.addJob(job); waitForJobStatus(job.getId(), ExecutableState.RUNNING, 500); @@ -52,9 +52,9 @@ public class ITDistributedSchedulerTakeOverTest extends BaseTestDistributedSched waitForJobFinish(job.getId()); - Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState()); - Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task2.getId()).getState()); - Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task3.getId()).getState()); - Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState()); + Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task1.getId()).getState()); + Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task2.getId()).getState()); + Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task3.getId()).getState()); + Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(job.getId()).getState()); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/aea46d7c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java index d8d27c5..613d783 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java @@ -24,7 +24,7 @@ import java.util.concurrent.ExecutorService; import javax.annotation.Nullable; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -52,11 +52,19 @@ import com.google.common.collect.Iterables; public class ZookeeperDistributedJobLock implements DistributedJobLock { private static Logger logger = LoggerFactory.getLogger(ZookeeperDistributedJobLock.class); - private static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock"; - private static CuratorFramework zkClient; - private static PathChildrenCache childrenCache; + public static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock"; + + final private KylinConfig config; + final CuratorFramework zkClient; + final PathChildrenCache childrenCache; + + public ZookeeperDistributedJobLock() { + this(KylinConfig.getInstanceFromEnv()); + } + + public ZookeeperDistributedJobLock(KylinConfig config) { + this.config = config; - static { String zkConnectString = getZKConnectString(); logger.info("zk connection string:" + zkConnectString); if (StringUtils.isEmpty(zkConnectString)) { @@ -71,12 +79,7 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock { Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { - try { - childrenCache.close(); - zkClient.close(); - } catch (Exception e) { - logger.error("error occurred to close PathChildrenCache", e); - } + close(); } })); } @@ -200,7 +203,7 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock { Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); final String serverList = conf.get(HConstants.ZOOKEEPER_QUORUM); final String port = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT); - return org.apache.commons.lang3.StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() { + return StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() { @Nullable @Override public String apply(String input) { @@ -210,11 +213,11 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock { } private String getLockPath(String pathName) { - return ZOOKEEPER_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix() + "/" + pathName; + return ZOOKEEPER_LOCK_PATH + "/" + config.getMetadataUrlPrefix() + "/" + pathName; } - private static String getWatchPath() { - return ZOOKEEPER_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix(); + private String getWatchPath() { + return ZOOKEEPER_LOCK_PATH + "/" + config.getMetadataUrlPrefix(); } @Override @@ -226,4 +229,13 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock { public void unlock() { } + + public void close() { + try { + childrenCache.close(); + zkClient.close(); + } catch (Exception e) { + logger.error("error occurred to close PathChildrenCache", e); + } + } }