http://git-wip-us.apache.org/repos/asf/kylin/blob/62e7c18d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
index 983bfd9..eb01e4b 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
@@ -19,13 +19,14 @@
 package org.apache.kylin.storage.hbase.util;
 
 import java.nio.charset.Charset;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.imps.CuratorFrameworkState;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
@@ -36,18 +37,14 @@ import org.apache.zookeeper.CreateMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/**
- * the jobLock is specially used to support distributed scheduler.
- */
-
 public class ZookeeperDistributedJobLock implements DistributedJobLock {
     private static Logger logger = 
LoggerFactory.getLogger(ZookeeperDistributedJobLock.class);
 
-    public static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock";
-
-    final private KylinConfig config;
-    final CuratorFramework zkClient;
-    final PathChildrenCache childrenCache;
+    @SuppressWarnings("unused")
+    private final KylinConfig config;
+    
+    private static final ConcurrentMap<KylinConfig, CuratorFramework> CACHE = 
new ConcurrentHashMap<KylinConfig, CuratorFramework>();
+    private final CuratorFramework zkClient;
 
     public ZookeeperDistributedJobLock() {
         this(KylinConfig.getInstanceFromEnv());
@@ -57,16 +54,12 @@ public class ZookeeperDistributedJobLock implements 
DistributedJobLock {
         this.config = config;
 
         String zkConnectString = ZookeeperUtil.getZKConnectString();
-        logger.info("zk connection string:" + zkConnectString);
         if (StringUtils.isEmpty(zkConnectString)) {
             throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!");
         }
 
-        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
-        zkClient = CuratorFrameworkFactory.newClient(zkConnectString, 
retryPolicy);
-        zkClient.start();
+        zkClient = getZKClient(config, zkConnectString);
 
-        childrenCache = new PathChildrenCache(zkClient, getWatchPath(), true);
         Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
             @Override
             public void run() {
@@ -75,97 +68,104 @@ public class ZookeeperDistributedJobLock implements 
DistributedJobLock {
         }));
     }
 
+    //make the zkClient to be singleton
+    private static CuratorFramework getZKClient(KylinConfig config, String 
zkConnectString) {
+        CuratorFramework zkClient = CACHE.get(config);
+        if (zkClient == null) {
+            synchronized (ZookeeperDistributedJobLock.class) {
+                zkClient = CACHE.get(config);
+                if (zkClient == null) {
+                    RetryPolicy retryPolicy = new 
ExponentialBackoffRetry(1000, 3);
+                    zkClient = 
CuratorFrameworkFactory.newClient(zkConnectString, 120000, 15000, retryPolicy);
+                    zkClient.start();
+                    CACHE.put(config, zkClient);
+                    if (CACHE.size() > 1) {
+                        logger.warn("More than one singleton exist");
+                    }
+                }
+            }
+        }
+        return zkClient;
+    }
+
     /**
-     * Lock the segment with the segmentId and serverName.
-     *
-     * <p> if the segment related job want to be scheduled,
-     * it must acquire the segment lock. segmentId is used to get the lock 
path,
-     * serverName marked which job server keep the segment lock.
+     * Try locking the path with the lockPath and lockClient, if lock 
successfully,
+     * the lockClient will write into the data of lockPath.
      *
-     * @param segmentId the id of segment need to lock
+     * @param lockPath the path will create in zookeeper
      *
-     * @param serverName the hostname of job server
+     * @param lockClient the mark of client
      *
-     * @return <tt>true</tt> if the segment locked successfully
+     * @return <tt>true</tt> if lock successfully or the lockClient has kept 
the lock
      *
      * @since 2.0
      */
 
     @Override
-    public boolean lockWithName(String segmentId, String serverName) {
-        String lockPath = getLockPath(segmentId);
-        logger.info(serverName + " start lock the segment: " + segmentId);
+    public boolean lockPath(String lockPath, String lockClient) {
+        logger.info(lockClient + " start lock the path: " + lockPath);
 
         boolean hasLock = false;
         try {
-            if (!(zkClient.getState().equals(CuratorFrameworkState.STARTED))) {
-                logger.error("zookeeper have not start");
-                return false;
-            }
             if (zkClient.checkExists().forPath(lockPath) != null) {
-                if (isKeepLock(serverName, lockPath)) {
+                if (isKeepLock(lockClient, lockPath)) {
                     hasLock = true;
-                    logger.info(serverName + " has kept the lock for segment: 
" + segmentId);
+                    logger.info(lockClient + " has kept the lock for the path: 
" + lockPath);
                 }
             } else {
-                
zkClient.create().withMode(CreateMode.EPHEMERAL).forPath(lockPath, 
serverName.getBytes(Charset.forName("UTF-8")));
-                if (isKeepLock(serverName, lockPath)) {
+                
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(lockPath,
 lockClient.getBytes(Charset.forName("UTF-8")));
+                if (isKeepLock(lockClient, lockPath)) {
                     hasLock = true;
-                    logger.info(serverName + " lock the segment: " + segmentId 
+ " successfully");
+                    logger.info(lockClient + " lock the path: " + lockPath + " 
successfully");
                 }
             }
         } catch (Exception e) {
-            logger.error(serverName + " error acquire lock for the segment: " 
+ segmentId, e);
-        }
-        if (!hasLock) {
-            logger.info(serverName + " fail to acquire lock for the segment: " 
+ segmentId);
-            return false;
+            logger.error(lockClient + " error acquire lock for the path: " + 
lockPath, e);
         }
-        return true;
+        return hasLock;
     }
 
     /**
      *
-     * Returns <tt>true</tt> if, the job server is keeping the lock for the 
lockPath
+     * Returns <tt>true</tt> if, the lockClient is keeping the lock for the 
lockPath
      *
-     * @param serverName the hostname of job server
+     * @param lockClient the mark of client
      *
-     * @param lockPath the zookeeper node path of segment
+     * @param lockPath the zookeeper node path for the lock
      *
-     * @return <tt>true</tt> if the job server is keeping the lock for the 
lockPath, otherwise
+     * @return <tt>true</tt> if the lockClient is keeping the lock for the 
lockPath, otherwise
      * <tt>false</tt>
      *
      * @since 2.0
      */
 
-    private boolean isKeepLock(String serverName, String lockPath) {
+    private boolean isKeepLock(String lockClient, String lockPath) {
         try {
             if (zkClient.checkExists().forPath(lockPath) != null) {
                 byte[] data = zkClient.getData().forPath(lockPath);
                 String lockServerName = new String(data, 
Charset.forName("UTF-8"));
-                return lockServerName.equalsIgnoreCase(serverName);
+                return lockServerName.equalsIgnoreCase(lockClient);
             }
         } catch (Exception e) {
-            logger.error("fail to get the serverName for the path: " + 
lockPath, e);
+            logger.error("fail to get the lockClient for the path: " + 
lockPath, e);
         }
         return false;
     }
 
     /**
      *
-     * Returns <tt>true</tt> if, and only if, the segment has been locked.
+     * Returns <tt>true</tt> if, and only if, the path has been locked.
      *
-     * @param segmentId the id of segment need to release the lock.
+     * @param lockPath the zookeeper node path for the lock
      *
-     * @return <tt>true</tt> if the segment has been locked, otherwise
+     * @return <tt>true</tt> if the path has been locked, otherwise
      * <tt>false</tt>
      *
      * @since 2.0
      */
 
     @Override
-    public boolean isHasLocked(String segmentId) {
-        String lockPath = getLockPath(segmentId);
+    public boolean isPathLocked(String lockPath) {
         try {
             return zkClient.checkExists().forPath(lockPath) != null;
         } catch (Exception e) {
@@ -175,85 +175,80 @@ public class ZookeeperDistributedJobLock implements 
DistributedJobLock {
     }
 
     /**
-     * release the segment lock with the segmentId.
+     * release the lock with the specific path.
      *
-     * <p> the segment related zookeeper node will be deleted.
+     * <p> the path related zookeeper node will be deleted.
      *
-     * @param segmentId the id of segment need to release the lock
+     * @param lockPath the zookeeper node path for the lock.
      *
      * @since 2.0
      */
 
     @Override
-    public void unlockWithName(String segmentId) {
-        String lockPath = getLockPath(segmentId);
+    public void unlockPath(String lockPath) {
         try {
-            if (zkClient.getState().equals(CuratorFrameworkState.STARTED)) {
-                if (zkClient.checkExists().forPath(lockPath) != null) {
-                    
zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(lockPath);
-                    logger.info("the lock for " + segmentId + " release 
successfully");
-                } else {
-                    logger.info("the lock for " + segmentId + " has released");
-                }
+            if (zkClient.checkExists().forPath(lockPath) != null) {
+                
zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(lockPath);
+                logger.info("the lock for " + lockPath + " release 
successfully");
+            } else {
+                logger.info("the lock for " + lockPath + " has released");
             }
         } catch (Exception e) {
-            logger.error("error release lock :" + segmentId);
+            logger.error("error release lock :" + lockPath);
             throw new RuntimeException(e);
         }
     }
 
     /**
-     * watching all the locked segments related zookeeper nodes change,
-     * in order to when one job server is down, other job server can take over 
the running jobs.
+     * watch the path so that when zookeeper node change, the client could 
receive the notification.
+     * Note: the client should close the PathChildrenCache in time.
+     *
+     * @param watchPath the path need to watch
+     *
+     * @param watchExecutor the executor watching the zookeeper node change
      *
-     * @param pool the threadPool watching the zookeeper node change
+     * @param watcherProcess do the concrete action with the node path and 
node data when zookeeper node changed
      *
-     * @param doWatch do the concrete action with the zookeeper node path and 
zookeeper node data
+     * @return PathChildrenCache  the client should close the 
PathChildrenCache in time
      *
      * @since 2.0
      */
 
     @Override
-    public void watchLock(ExecutorService pool, final DoWatchLock doWatch) {
+    public PathChildrenCache watchPath(String watchPath, Executor 
watchExecutor, final Watcher watcherProcess) {
+        PathChildrenCache cache = new PathChildrenCache(zkClient, watchPath, 
true);
         try {
-            
childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
-            childrenCache.getListenable().addListener(new 
PathChildrenCacheListener() {
+            cache.start();
+            cache.getListenable().addListener(new PathChildrenCacheListener() {
                 @Override
                 public void childEvent(CuratorFramework client, 
PathChildrenCacheEvent event) throws Exception {
                     switch (event.getType()) {
                     case CHILD_REMOVED:
-                        doWatch.doWatch(event.getData().getPath(), new 
String(event.getData().getData(), Charset.forName("UTF-8")));
+                        watcherProcess.process(event.getData().getPath(), new 
String(event.getData().getData(), Charset.forName("UTF-8")));
                         break;
                     default:
                         break;
                     }
                 }
-            }, pool);
+            }, watchExecutor);
         } catch (Exception e) {
             logger.warn("watch the zookeeper node fail: " + e);
         }
-    }
-
-    private String getLockPath(String pathName) {
-        return ZOOKEEPER_LOCK_PATH + "/" + config.getMetadataUrlPrefix() + "/" 
+ pathName;
-    }
-
-    private String getWatchPath() {
-        return ZOOKEEPER_LOCK_PATH + "/" + config.getMetadataUrlPrefix();
+        return cache;
     }
 
     @Override
-    public boolean lock() {
+    public boolean lockJobEngine() {
         return true;
     }
 
     @Override
-    public void unlock() {
+    public void unlockJobEngine() {
     }
 
+    @Override
     public void close() {
         try {
-            childrenCache.close();
             zkClient.close();
         } catch (Exception e) {
             logger.error("error occurred to close PathChildrenCache", e);

http://git-wip-us.apache.org/repos/asf/kylin/blob/62e7c18d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
index 7bf7498..6a3cf7e 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
@@ -56,7 +56,7 @@ public class ZookeeperJobLock implements JobLock {
     private CuratorFramework zkClient;
 
     @Override
-    public boolean lock() {
+    public boolean lockJobEngine() {
         this.scheduleID = schedulerId();
         String zkConnectString = getZKConnectString();
         logger.info("zk connection string:" + zkConnectString);
@@ -100,7 +100,7 @@ public class ZookeeperJobLock implements JobLock {
     }
 
     @Override
-    public void unlock() {
+    public void unlockJobEngine() {
         releaseLock();
     }
 

Reply via email to