Repository: kylin Updated Branches: refs/heads/KYLIN-2606 f8336d4bb -> d610a6dd5 (forced update)
KYLIN-2557 code review Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/1c80c29b Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/1c80c29b Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/1c80c29b Branch: refs/heads/KYLIN-2606 Commit: 1c80c29b22d9c00de1eb2e9a09c21377714248b7 Parents: e6a8a00 Author: Yang Li <liy...@apache.org> Authored: Sun May 14 20:14:29 2017 +0800 Committer: Yang Li <liy...@apache.org> Committed: Sun May 14 20:14:29 2017 +0800 ---------------------------------------------------------------------- .../kylin/storage/hbase/HBaseConnection.java | 54 ++++++++++---------- 1 file changed, 27 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/1c80c29b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java index ef82641..5fafa2b 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java @@ -18,7 +18,18 @@ package org.apache.kylin.storage.hbase; -import com.google.common.collect.Sets; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -39,17 +50,7 @@ import org.apache.kylin.common.util.HadoopUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.util.Collection; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import com.google.common.collect.Sets; /** * @author yangli9 @@ -67,8 +68,6 @@ public class HBaseConnection { private static ExecutorService coprocessorPool = null; - private static DistributedLock lock = null; - static { Runtime.getRuntime().addShutdownHook(new Thread() { @Override @@ -268,18 +267,20 @@ public class HBaseConnection { } public static void createHTableIfNeeded(Connection conn, String table, String... families) throws IOException { - Admin hbase = conn.getAdmin(); + Admin admin = conn.getAdmin(); TableName tableName = TableName.valueOf(table); - boolean hasLock = false; + DistributedLock lock = null; + String lockPath = getLockPath(table); + try { if (tableExists(conn, table)) { logger.debug("HTable '" + table + "' already exists"); - Set<String> existingFamilies = getFamilyNames(hbase.getTableDescriptor(tableName)); + Set<String> existingFamilies = getFamilyNames(admin.getTableDescriptor(tableName)); boolean wait = false; for (String family : families) { if (existingFamilies.contains(family) == false) { logger.debug("Adding family '" + family + "' to HTable '" + table + "'"); - hbase.addColumn(tableName, newFamilyDescriptor(family)); + admin.addColumn(tableName, newFamilyDescriptor(family)); // addColumn() is async, is there a way to wait it finish? wait = true; } @@ -295,7 +296,8 @@ public class HBaseConnection { } lock = KylinConfig.getInstanceFromEnv().getDistributedLockFactory().lockForCurrentProcess(); - hasLock = lock.lock(getLockPath(table), Long.MAX_VALUE); + if (!lock.lock(lockPath, Long.MAX_VALUE)) + throw new RuntimeException("Cannot acquire lock to create HTable " + table); if (tableExists(conn, table)) { logger.debug("HTable '" + table + "' already exists"); @@ -313,15 +315,13 @@ public class HBaseConnection { } } - //desc.setValue(HTABLE_UUID_TAG, UUID.randomUUID().toString()); - hbase.createTable(desc); + admin.createTable(desc); logger.debug("HTable '" + table + "' created"); } finally { - hbase.close(); - if (hasLock && lock != null) { - lock.unlock(getLockPath(table)); - } + admin.close(); + if (lock != null && lock.isLockedByMe(lockPath)) + lock.unlock(lockPath); } } @@ -365,8 +365,8 @@ public class HBaseConnection { } } - private static String getLockPath(String pathName) { - return "/create_htable/" + pathName + "/lock"; + private static String getLockPath(String table) { + return "/create_htable/" + table + "/lock"; } } \ No newline at end of file