http://git-wip-us.apache.org/repos/asf/kylin/blob/62e7c18d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java index 983bfd9..eb01e4b 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java @@ -19,13 +19,14 @@ package org.apache.kylin.storage.hbase.util; import java.nio.charset.Charset; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; import org.apache.commons.lang3.StringUtils; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; @@ -36,18 +37,14 @@ import org.apache.zookeeper.CreateMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * the jobLock is specially used to support distributed scheduler. - */ - public class ZookeeperDistributedJobLock implements DistributedJobLock { private static Logger logger = LoggerFactory.getLogger(ZookeeperDistributedJobLock.class); - public static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock"; - - final private KylinConfig config; - final CuratorFramework zkClient; - final PathChildrenCache childrenCache; + @SuppressWarnings("unused") + private final KylinConfig config; + + private static final ConcurrentMap<KylinConfig, CuratorFramework> CACHE = new ConcurrentHashMap<KylinConfig, CuratorFramework>(); + private final CuratorFramework zkClient; public ZookeeperDistributedJobLock() { this(KylinConfig.getInstanceFromEnv()); @@ -57,16 +54,12 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock { this.config = config; String zkConnectString = ZookeeperUtil.getZKConnectString(); - logger.info("zk connection string:" + zkConnectString); if (StringUtils.isEmpty(zkConnectString)) { throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!"); } - RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); - zkClient = CuratorFrameworkFactory.newClient(zkConnectString, retryPolicy); - zkClient.start(); + zkClient = getZKClient(config, zkConnectString); - childrenCache = new PathChildrenCache(zkClient, getWatchPath(), true); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { @@ -75,97 +68,104 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock { })); } + //make the zkClient to be singleton + private static CuratorFramework getZKClient(KylinConfig config, String zkConnectString) { + CuratorFramework zkClient = CACHE.get(config); + if (zkClient == null) { + synchronized (ZookeeperDistributedJobLock.class) { + zkClient = CACHE.get(config); + if (zkClient == null) { + RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); + zkClient = CuratorFrameworkFactory.newClient(zkConnectString, 120000, 15000, retryPolicy); + zkClient.start(); + CACHE.put(config, zkClient); + if (CACHE.size() > 1) { + logger.warn("More than one singleton exist"); + } + } + } + } + return zkClient; + } + /** - * Lock the segment with the segmentId and serverName. - * - * <p> if the segment related job want to be scheduled, - * it must acquire the segment lock. segmentId is used to get the lock path, - * serverName marked which job server keep the segment lock. + * Try locking the path with the lockPath and lockClient, if lock successfully, + * the lockClient will write into the data of lockPath. * - * @param segmentId the id of segment need to lock + * @param lockPath the path will create in zookeeper * - * @param serverName the hostname of job server + * @param lockClient the mark of client * - * @return <tt>true</tt> if the segment locked successfully + * @return <tt>true</tt> if lock successfully or the lockClient has kept the lock * * @since 2.0 */ @Override - public boolean lockWithName(String segmentId, String serverName) { - String lockPath = getLockPath(segmentId); - logger.info(serverName + " start lock the segment: " + segmentId); + public boolean lockPath(String lockPath, String lockClient) { + logger.info(lockClient + " start lock the path: " + lockPath); boolean hasLock = false; try { - if (!(zkClient.getState().equals(CuratorFrameworkState.STARTED))) { - logger.error("zookeeper have not start"); - return false; - } if (zkClient.checkExists().forPath(lockPath) != null) { - if (isKeepLock(serverName, lockPath)) { + if (isKeepLock(lockClient, lockPath)) { hasLock = true; - logger.info(serverName + " has kept the lock for segment: " + segmentId); + logger.info(lockClient + " has kept the lock for the path: " + lockPath); } } else { - zkClient.create().withMode(CreateMode.EPHEMERAL).forPath(lockPath, serverName.getBytes(Charset.forName("UTF-8"))); - if (isKeepLock(serverName, lockPath)) { + zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(lockPath, lockClient.getBytes(Charset.forName("UTF-8"))); + if (isKeepLock(lockClient, lockPath)) { hasLock = true; - logger.info(serverName + " lock the segment: " + segmentId + " successfully"); + logger.info(lockClient + " lock the path: " + lockPath + " successfully"); } } } catch (Exception e) { - logger.error(serverName + " error acquire lock for the segment: " + segmentId, e); - } - if (!hasLock) { - logger.info(serverName + " fail to acquire lock for the segment: " + segmentId); - return false; + logger.error(lockClient + " error acquire lock for the path: " + lockPath, e); } - return true; + return hasLock; } /** * - * Returns <tt>true</tt> if, the job server is keeping the lock for the lockPath + * Returns <tt>true</tt> if, the lockClient is keeping the lock for the lockPath * - * @param serverName the hostname of job server + * @param lockClient the mark of client * - * @param lockPath the zookeeper node path of segment + * @param lockPath the zookeeper node path for the lock * - * @return <tt>true</tt> if the job server is keeping the lock for the lockPath, otherwise + * @return <tt>true</tt> if the lockClient is keeping the lock for the lockPath, otherwise * <tt>false</tt> * * @since 2.0 */ - private boolean isKeepLock(String serverName, String lockPath) { + private boolean isKeepLock(String lockClient, String lockPath) { try { if (zkClient.checkExists().forPath(lockPath) != null) { byte[] data = zkClient.getData().forPath(lockPath); String lockServerName = new String(data, Charset.forName("UTF-8")); - return lockServerName.equalsIgnoreCase(serverName); + return lockServerName.equalsIgnoreCase(lockClient); } } catch (Exception e) { - logger.error("fail to get the serverName for the path: " + lockPath, e); + logger.error("fail to get the lockClient for the path: " + lockPath, e); } return false; } /** * - * Returns <tt>true</tt> if, and only if, the segment has been locked. + * Returns <tt>true</tt> if, and only if, the path has been locked. * - * @param segmentId the id of segment need to release the lock. + * @param lockPath the zookeeper node path for the lock * - * @return <tt>true</tt> if the segment has been locked, otherwise + * @return <tt>true</tt> if the path has been locked, otherwise * <tt>false</tt> * * @since 2.0 */ @Override - public boolean isHasLocked(String segmentId) { - String lockPath = getLockPath(segmentId); + public boolean isPathLocked(String lockPath) { try { return zkClient.checkExists().forPath(lockPath) != null; } catch (Exception e) { @@ -175,85 +175,80 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock { } /** - * release the segment lock with the segmentId. + * release the lock with the specific path. * - * <p> the segment related zookeeper node will be deleted. + * <p> the path related zookeeper node will be deleted. * - * @param segmentId the id of segment need to release the lock + * @param lockPath the zookeeper node path for the lock. * * @since 2.0 */ @Override - public void unlockWithName(String segmentId) { - String lockPath = getLockPath(segmentId); + public void unlockPath(String lockPath) { try { - if (zkClient.getState().equals(CuratorFrameworkState.STARTED)) { - if (zkClient.checkExists().forPath(lockPath) != null) { - zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(lockPath); - logger.info("the lock for " + segmentId + " release successfully"); - } else { - logger.info("the lock for " + segmentId + " has released"); - } + if (zkClient.checkExists().forPath(lockPath) != null) { + zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(lockPath); + logger.info("the lock for " + lockPath + " release successfully"); + } else { + logger.info("the lock for " + lockPath + " has released"); } } catch (Exception e) { - logger.error("error release lock :" + segmentId); + logger.error("error release lock :" + lockPath); throw new RuntimeException(e); } } /** - * watching all the locked segments related zookeeper nodes change, - * in order to when one job server is down, other job server can take over the running jobs. + * watch the path so that when zookeeper node change, the client could receive the notification. + * Note: the client should close the PathChildrenCache in time. + * + * @param watchPath the path need to watch + * + * @param watchExecutor the executor watching the zookeeper node change * - * @param pool the threadPool watching the zookeeper node change + * @param watcherProcess do the concrete action with the node path and node data when zookeeper node changed * - * @param doWatch do the concrete action with the zookeeper node path and zookeeper node data + * @return PathChildrenCache the client should close the PathChildrenCache in time * * @since 2.0 */ @Override - public void watchLock(ExecutorService pool, final DoWatchLock doWatch) { + public PathChildrenCache watchPath(String watchPath, Executor watchExecutor, final Watcher watcherProcess) { + PathChildrenCache cache = new PathChildrenCache(zkClient, watchPath, true); try { - childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); - childrenCache.getListenable().addListener(new PathChildrenCacheListener() { + cache.start(); + cache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { switch (event.getType()) { case CHILD_REMOVED: - doWatch.doWatch(event.getData().getPath(), new String(event.getData().getData(), Charset.forName("UTF-8"))); + watcherProcess.process(event.getData().getPath(), new String(event.getData().getData(), Charset.forName("UTF-8"))); break; default: break; } } - }, pool); + }, watchExecutor); } catch (Exception e) { logger.warn("watch the zookeeper node fail: " + e); } - } - - private String getLockPath(String pathName) { - return ZOOKEEPER_LOCK_PATH + "/" + config.getMetadataUrlPrefix() + "/" + pathName; - } - - private String getWatchPath() { - return ZOOKEEPER_LOCK_PATH + "/" + config.getMetadataUrlPrefix(); + return cache; } @Override - public boolean lock() { + public boolean lockJobEngine() { return true; } @Override - public void unlock() { + public void unlockJobEngine() { } + @Override public void close() { try { - childrenCache.close(); zkClient.close(); } catch (Exception e) { logger.error("error occurred to close PathChildrenCache", e);
http://git-wip-us.apache.org/repos/asf/kylin/blob/62e7c18d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java index 7bf7498..6a3cf7e 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java @@ -56,7 +56,7 @@ public class ZookeeperJobLock implements JobLock { private CuratorFramework zkClient; @Override - public boolean lock() { + public boolean lockJobEngine() { this.scheduleID = schedulerId(); String zkConnectString = getZKConnectString(); logger.info("zk connection string:" + zkConnectString); @@ -100,7 +100,7 @@ public class ZookeeperJobLock implements JobLock { } @Override - public void unlock() { + public void unlockJobEngine() { releaseLock(); }