KYLIN-2578 ensures zk path is prefix-ed with /kylin/metadata-url

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/263a1e88
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/263a1e88
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/263a1e88

Branch: refs/heads/KYLIN-2624
Commit: 263a1e8889ba223883ba42178debeee0bd02c240
Parents: d738544
Author: Yang Li <liy...@apache.org>
Authored: Sun May 7 15:56:43 2017 +0800
Committer: Yang Li <liy...@apache.org>
Committed: Sun May 7 15:56:43 2017 +0800

----------------------------------------------------------------------
 .../kylin/common/lock/DistributedLock.java      |  2 ++
 .../kylin/dict/GlobalDictionaryBuilder.java     |  4 +--
 .../impl/threadpool/DistributedScheduler.java   |  6 ++--
 .../hbase/util/ZookeeperDistributedLock.java    | 35 +++++++++++++++++---
 .../util/ITZookeeperDistributedLockTest.java    |  2 +-
 5 files changed, 38 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/263a1e88/core-common/src/main/java/org/apache/kylin/common/lock/DistributedLock.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/lock/DistributedLock.java 
b/core-common/src/main/java/org/apache/kylin/common/lock/DistributedLock.java
index e8844fd..8f1fae0 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/lock/DistributedLock.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/lock/DistributedLock.java
@@ -23,6 +23,8 @@ import java.util.concurrent.Executor;
 
 /**
  * A distributed lock. Every instance is owned by a client, on whose behalf 
locks are acquired and/or released.
+ * 
+ * Implementation must ensure all <code>lockPath</code> will be prefix-ed with 
"/kylin/metadata-prefix" automatically.
  */
 public interface DistributedLock {
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/263a1e88/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
----------------------------------------------------------------------
diff --git 
a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
 
b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
index 0ec7730..a593371 100644
--- 
a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
+++ 
b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
@@ -93,10 +93,8 @@ public class GlobalDictionaryBuilder implements 
IDictionaryBuilder {
         return new AppendTrieDictionary<>();
     }
 
-    private static final String GLOBAL_DICT_LOCK_PATH = "/kylin/dict/lock";
-
     private String getLockPath(String pathName) {
-        return GLOBAL_DICT_LOCK_PATH + "/" + 
KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix() + "/" + pathName + 
"/lock";
+        return "/dict/" + pathName + "/lock";
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/263a1e88/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
index e80f485..0714d90 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
@@ -84,7 +84,7 @@ public class DistributedScheduler implements 
Scheduler<AbstractExecutable>, Conn
 
 
     private final static String SEGMENT_ID = "segmentId";
-    public static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock";
+    public static final String ZOOKEEPER_LOCK_PATH = "/job_engine/lock"; // 
note ZookeeperDistributedLock will ensure zk path prefix: /kylin/metadata
 
     //only for it test
     public static DistributedScheduler getInstance(KylinConfig config) {
@@ -305,11 +305,11 @@ public class DistributedScheduler implements 
Scheduler<AbstractExecutable>, Conn
     }
 
     public String getLockPath(String pathName) {
-        return ZOOKEEPER_LOCK_PATH + "/" + 
KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix() + "/" + pathName;
+        return ZOOKEEPER_LOCK_PATH + "/" + pathName;
     }
 
     private String getWatchPath() {
-        return ZOOKEEPER_LOCK_PATH + "/" + 
KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix();
+        return ZOOKEEPER_LOCK_PATH;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/263a1e88/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedLock.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedLock.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedLock.java
index 9f692e4..d181d81 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedLock.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedLock.java
@@ -35,7 +35,6 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.lock.DistributedLock;
 import org.apache.kylin.common.lock.DistributedLockFactory;
-import org.apache.kylin.job.impl.threadpool.DistributedScheduler;
 import org.apache.kylin.job.lock.JobLock;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -44,6 +43,8 @@ import org.slf4j.LoggerFactory;
 
 /**
  * A distributed lock based on zookeeper. Every instance is owned by a client, 
on whose behalf locks are acquired and/or released.
+ * 
+ * All <code>lockPath</code> will be prefix-ed with "/kylin/metadata-prefix" 
automatically.
  */
 public class ZookeeperDistributedLock implements DistributedLock, JobLock {
     private static Logger logger = 
LoggerFactory.getLogger(ZookeeperDistributedLock.class);
@@ -92,6 +93,7 @@ public class ZookeeperDistributedLock implements 
DistributedLock, JobLock {
             return ZookeeperUtil.getZKConnectString();
         }
 
+        final String zkPathBase;
         final CuratorFramework curator;
 
         public Factory() {
@@ -100,25 +102,30 @@ public class ZookeeperDistributedLock implements 
DistributedLock, JobLock {
 
         public Factory(KylinConfig config) {
             this.curator = getZKClient(config);
+            this.zkPathBase = "/kylin/" + 
KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix();
         }
 
         @Override
         public DistributedLock lockForClient(String client) {
-            return new ZookeeperDistributedLock(curator, client);
+            return new ZookeeperDistributedLock(curator, zkPathBase, client);
         }
     }
 
     // 
============================================================================
 
     final CuratorFramework curator;
+    final String zkPathBase;
     final String client;
     final byte[] clientBytes;
 
-    private ZookeeperDistributedLock(CuratorFramework curator, String client) {
+    private ZookeeperDistributedLock(CuratorFramework curator, String 
zkPathBase, String client) {
         if (client == null)
             throw new NullPointerException("client must not be null");
+        if (zkPathBase == null)
+            throw new NullPointerException("zkPathBase must not be null");
         
         this.curator = curator;
+        this.zkPathBase = zkPathBase;
         this.client = client;
         this.clientBytes = client.getBytes(Charset.forName("UTF-8"));
     }
@@ -130,6 +137,8 @@ public class ZookeeperDistributedLock implements 
DistributedLock, JobLock {
 
     @Override
     public boolean lock(String lockPath) {
+        lockPath = norm(lockPath);
+        
         logger.debug(client + " trying to lock " + lockPath);
 
         try {
@@ -152,6 +161,8 @@ public class ZookeeperDistributedLock implements 
DistributedLock, JobLock {
 
     @Override
     public boolean lock(String lockPath, long timeout) {
+        lockPath = norm(lockPath);
+
         if (lock(lockPath))
             return true;
 
@@ -182,6 +193,8 @@ public class ZookeeperDistributedLock implements 
DistributedLock, JobLock {
 
     @Override
     public String peekLock(String lockPath) {
+        lockPath = norm(lockPath);
+
         try {
             byte[] bytes = curator.getData().forPath(lockPath);
             return new String(bytes, Charset.forName("UTF-8"));
@@ -204,6 +217,8 @@ public class ZookeeperDistributedLock implements 
DistributedLock, JobLock {
     
     @Override
     public void unlock(String lockPath) {
+        lockPath = norm(lockPath);
+
         logger.debug(client + " trying to unlock " + lockPath);
 
         String owner = peekLock(lockPath);
@@ -224,6 +239,8 @@ public class ZookeeperDistributedLock implements 
DistributedLock, JobLock {
     
     @Override
     public void purgeLocks(String lockPathRoot) {
+        lockPathRoot = norm(lockPathRoot);
+
         try {
             
curator.delete().guaranteed().deletingChildrenIfNeeded().forPath(lockPathRoot);
 
@@ -236,6 +253,8 @@ public class ZookeeperDistributedLock implements 
DistributedLock, JobLock {
 
     @Override
     public Closeable watchLocks(String lockPathRoot, Executor executor, final 
Watcher watcher) {
+        lockPathRoot = norm(lockPathRoot);
+
         PathChildrenCache cache = new PathChildrenCache(curator, lockPathRoot, 
true);
         try {
             cache.start();
@@ -259,6 +278,14 @@ public class ZookeeperDistributedLock implements 
DistributedLock, JobLock {
         }
         return cache;
     }
+    
+    // normalize lock path
+    private String norm(String lockPath) {
+        if (lockPath.startsWith(zkPathBase))
+            return lockPath;
+        else
+            return zkPathBase + (lockPath.startsWith("/") ? "" : "/") + 
lockPath;
+    }
 
     // 
============================================================================
 
@@ -274,7 +301,7 @@ public class ZookeeperDistributedLock implements 
DistributedLock, JobLock {
     }
 
     private String jobEngineLockPath() {
-        return DistributedScheduler.ZOOKEEPER_LOCK_PATH + "/" + 
KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix() + "/global_engine_lock";
+        return "/job_engine/global_job_engine_lock";
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/263a1e88/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/util/ITZookeeperDistributedLockTest.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/util/ITZookeeperDistributedLockTest.java
 
b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/util/ITZookeeperDistributedLockTest.java
index 797b66b..1bfe8f7 100644
--- 
a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/util/ITZookeeperDistributedLockTest.java
+++ 
b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/util/ITZookeeperDistributedLockTest.java
@@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory;
 
 public class ITZookeeperDistributedLockTest extends HBaseMetadataTestCase {
     private static final Logger logger = 
LoggerFactory.getLogger(ITZookeeperDistributedLockTest.class);
-    private static final String ZK_PFX = 
"/kylin/test/ZookeeperDistributedLockTest/" + new Random().nextInt(10000000);
+    private static final String ZK_PFX = "/test/ZookeeperDistributedLockTest/" 
+ new Random().nextInt(10000000);
 
     static ZookeeperDistributedLock.Factory factory;
 

Reply via email to