KYLIN-2578 ensures zk path is prefix-ed with /kylin/metadata-url
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/263a1e88 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/263a1e88 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/263a1e88 Branch: refs/heads/KYLIN-2624 Commit: 263a1e8889ba223883ba42178debeee0bd02c240 Parents: d738544 Author: Yang Li <liy...@apache.org> Authored: Sun May 7 15:56:43 2017 +0800 Committer: Yang Li <liy...@apache.org> Committed: Sun May 7 15:56:43 2017 +0800 ---------------------------------------------------------------------- .../kylin/common/lock/DistributedLock.java | 2 ++ .../kylin/dict/GlobalDictionaryBuilder.java | 4 +-- .../impl/threadpool/DistributedScheduler.java | 6 ++-- .../hbase/util/ZookeeperDistributedLock.java | 35 +++++++++++++++++--- .../util/ITZookeeperDistributedLockTest.java | 2 +- 5 files changed, 38 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/263a1e88/core-common/src/main/java/org/apache/kylin/common/lock/DistributedLock.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/lock/DistributedLock.java b/core-common/src/main/java/org/apache/kylin/common/lock/DistributedLock.java index e8844fd..8f1fae0 100644 --- a/core-common/src/main/java/org/apache/kylin/common/lock/DistributedLock.java +++ b/core-common/src/main/java/org/apache/kylin/common/lock/DistributedLock.java @@ -23,6 +23,8 @@ import java.util.concurrent.Executor; /** * A distributed lock. Every instance is owned by a client, on whose behalf locks are acquired and/or released. + * + * Implementation must ensure all <code>lockPath</code> will be prefix-ed with "/kylin/metadata-prefix" automatically. */ public interface DistributedLock { http://git-wip-us.apache.org/repos/asf/kylin/blob/263a1e88/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java index 0ec7730..a593371 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java @@ -93,10 +93,8 @@ public class GlobalDictionaryBuilder implements IDictionaryBuilder { return new AppendTrieDictionary<>(); } - private static final String GLOBAL_DICT_LOCK_PATH = "/kylin/dict/lock"; - private String getLockPath(String pathName) { - return GLOBAL_DICT_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix() + "/" + pathName + "/lock"; + return "/dict/" + pathName + "/lock"; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/263a1e88/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java index e80f485..0714d90 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java @@ -84,7 +84,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn private final static String SEGMENT_ID = "segmentId"; - public static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock"; + public static final String ZOOKEEPER_LOCK_PATH = "/job_engine/lock"; // note ZookeeperDistributedLock will ensure zk path prefix: /kylin/metadata //only for it test public static DistributedScheduler getInstance(KylinConfig config) { @@ -305,11 +305,11 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn } public String getLockPath(String pathName) { - return ZOOKEEPER_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix() + "/" + pathName; + return ZOOKEEPER_LOCK_PATH + "/" + pathName; } private String getWatchPath() { - return ZOOKEEPER_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix(); + return ZOOKEEPER_LOCK_PATH; } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/263a1e88/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedLock.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedLock.java index 9f692e4..d181d81 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedLock.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedLock.java @@ -35,7 +35,6 @@ import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.lock.DistributedLock; import org.apache.kylin.common.lock.DistributedLockFactory; -import org.apache.kylin.job.impl.threadpool.DistributedScheduler; import org.apache.kylin.job.lock.JobLock; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -44,6 +43,8 @@ import org.slf4j.LoggerFactory; /** * A distributed lock based on zookeeper. Every instance is owned by a client, on whose behalf locks are acquired and/or released. + * + * All <code>lockPath</code> will be prefix-ed with "/kylin/metadata-prefix" automatically. */ public class ZookeeperDistributedLock implements DistributedLock, JobLock { private static Logger logger = LoggerFactory.getLogger(ZookeeperDistributedLock.class); @@ -92,6 +93,7 @@ public class ZookeeperDistributedLock implements DistributedLock, JobLock { return ZookeeperUtil.getZKConnectString(); } + final String zkPathBase; final CuratorFramework curator; public Factory() { @@ -100,25 +102,30 @@ public class ZookeeperDistributedLock implements DistributedLock, JobLock { public Factory(KylinConfig config) { this.curator = getZKClient(config); + this.zkPathBase = "/kylin/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix(); } @Override public DistributedLock lockForClient(String client) { - return new ZookeeperDistributedLock(curator, client); + return new ZookeeperDistributedLock(curator, zkPathBase, client); } } // ============================================================================ final CuratorFramework curator; + final String zkPathBase; final String client; final byte[] clientBytes; - private ZookeeperDistributedLock(CuratorFramework curator, String client) { + private ZookeeperDistributedLock(CuratorFramework curator, String zkPathBase, String client) { if (client == null) throw new NullPointerException("client must not be null"); + if (zkPathBase == null) + throw new NullPointerException("zkPathBase must not be null"); this.curator = curator; + this.zkPathBase = zkPathBase; this.client = client; this.clientBytes = client.getBytes(Charset.forName("UTF-8")); } @@ -130,6 +137,8 @@ public class ZookeeperDistributedLock implements DistributedLock, JobLock { @Override public boolean lock(String lockPath) { + lockPath = norm(lockPath); + logger.debug(client + " trying to lock " + lockPath); try { @@ -152,6 +161,8 @@ public class ZookeeperDistributedLock implements DistributedLock, JobLock { @Override public boolean lock(String lockPath, long timeout) { + lockPath = norm(lockPath); + if (lock(lockPath)) return true; @@ -182,6 +193,8 @@ public class ZookeeperDistributedLock implements DistributedLock, JobLock { @Override public String peekLock(String lockPath) { + lockPath = norm(lockPath); + try { byte[] bytes = curator.getData().forPath(lockPath); return new String(bytes, Charset.forName("UTF-8")); @@ -204,6 +217,8 @@ public class ZookeeperDistributedLock implements DistributedLock, JobLock { @Override public void unlock(String lockPath) { + lockPath = norm(lockPath); + logger.debug(client + " trying to unlock " + lockPath); String owner = peekLock(lockPath); @@ -224,6 +239,8 @@ public class ZookeeperDistributedLock implements DistributedLock, JobLock { @Override public void purgeLocks(String lockPathRoot) { + lockPathRoot = norm(lockPathRoot); + try { curator.delete().guaranteed().deletingChildrenIfNeeded().forPath(lockPathRoot); @@ -236,6 +253,8 @@ public class ZookeeperDistributedLock implements DistributedLock, JobLock { @Override public Closeable watchLocks(String lockPathRoot, Executor executor, final Watcher watcher) { + lockPathRoot = norm(lockPathRoot); + PathChildrenCache cache = new PathChildrenCache(curator, lockPathRoot, true); try { cache.start(); @@ -259,6 +278,14 @@ public class ZookeeperDistributedLock implements DistributedLock, JobLock { } return cache; } + + // normalize lock path + private String norm(String lockPath) { + if (lockPath.startsWith(zkPathBase)) + return lockPath; + else + return zkPathBase + (lockPath.startsWith("/") ? "" : "/") + lockPath; + } // ============================================================================ @@ -274,7 +301,7 @@ public class ZookeeperDistributedLock implements DistributedLock, JobLock { } private String jobEngineLockPath() { - return DistributedScheduler.ZOOKEEPER_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix() + "/global_engine_lock"; + return "/job_engine/global_job_engine_lock"; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/263a1e88/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/util/ITZookeeperDistributedLockTest.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/util/ITZookeeperDistributedLockTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/util/ITZookeeperDistributedLockTest.java index 797b66b..1bfe8f7 100644 --- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/util/ITZookeeperDistributedLockTest.java +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/util/ITZookeeperDistributedLockTest.java @@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory; public class ITZookeeperDistributedLockTest extends HBaseMetadataTestCase { private static final Logger logger = LoggerFactory.getLogger(ITZookeeperDistributedLockTest.class); - private static final String ZK_PFX = "/kylin/test/ZookeeperDistributedLockTest/" + new Random().nextInt(10000000); + private static final String ZK_PFX = "/test/ZookeeperDistributedLockTest/" + new Random().nextInt(10000000); static ZookeeperDistributedLock.Factory factory;