Repository: kylin
Updated Branches:
  refs/heads/KYLIN-2606 f8336d4bb -> d610a6dd5 (forced update)


KYLIN-2557 code review


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/1c80c29b
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/1c80c29b
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/1c80c29b

Branch: refs/heads/KYLIN-2606
Commit: 1c80c29b22d9c00de1eb2e9a09c21377714248b7
Parents: e6a8a00
Author: Yang Li <liy...@apache.org>
Authored: Sun May 14 20:14:29 2017 +0800
Committer: Yang Li <liy...@apache.org>
Committed: Sun May 14 20:14:29 2017 +0800

----------------------------------------------------------------------
 .../kylin/storage/hbase/HBaseConnection.java    | 54 ++++++++++----------
 1 file changed, 27 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/1c80c29b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
index ef82641..5fafa2b 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
@@ -18,7 +18,18 @@
 
 package org.apache.kylin.storage.hbase;
 
-import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -39,17 +50,7 @@ import org.apache.kylin.common.util.HadoopUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import com.google.common.collect.Sets;
 
 /**
  * @author yangli9
@@ -67,8 +68,6 @@ public class HBaseConnection {
 
     private static ExecutorService coprocessorPool = null;
 
-    private static DistributedLock lock = null;
-
     static {
         Runtime.getRuntime().addShutdownHook(new Thread() {
             @Override
@@ -268,18 +267,20 @@ public class HBaseConnection {
     }
 
     public static void createHTableIfNeeded(Connection conn, String table, 
String... families) throws IOException {
-        Admin hbase = conn.getAdmin();
+        Admin admin = conn.getAdmin();
         TableName tableName = TableName.valueOf(table);
-        boolean hasLock = false;
+        DistributedLock lock = null;
+        String lockPath = getLockPath(table);
+        
         try {
             if (tableExists(conn, table)) {
                 logger.debug("HTable '" + table + "' already exists");
-                Set<String> existingFamilies = 
getFamilyNames(hbase.getTableDescriptor(tableName));
+                Set<String> existingFamilies = 
getFamilyNames(admin.getTableDescriptor(tableName));
                 boolean wait = false;
                 for (String family : families) {
                     if (existingFamilies.contains(family) == false) {
                         logger.debug("Adding family '" + family + "' to HTable 
'" + table + "'");
-                        hbase.addColumn(tableName, 
newFamilyDescriptor(family));
+                        admin.addColumn(tableName, 
newFamilyDescriptor(family));
                         // addColumn() is async, is there a way to wait it 
finish?
                         wait = true;
                     }
@@ -295,7 +296,8 @@ public class HBaseConnection {
             }
 
             lock = 
KylinConfig.getInstanceFromEnv().getDistributedLockFactory().lockForCurrentProcess();
-            hasLock = lock.lock(getLockPath(table), Long.MAX_VALUE);
+            if (!lock.lock(lockPath, Long.MAX_VALUE))
+                throw new RuntimeException("Cannot acquire lock to create 
HTable " + table);
 
             if (tableExists(conn, table)) {
                 logger.debug("HTable '" + table + "' already exists");
@@ -313,15 +315,13 @@ public class HBaseConnection {
                 }
             }
 
-            //desc.setValue(HTABLE_UUID_TAG, UUID.randomUUID().toString());
-            hbase.createTable(desc);
+            admin.createTable(desc);
 
             logger.debug("HTable '" + table + "' created");
         } finally {
-            hbase.close();
-            if (hasLock && lock != null) {
-                lock.unlock(getLockPath(table));
-            }
+            admin.close();
+            if (lock != null && lock.isLockedByMe(lockPath))
+                lock.unlock(lockPath);
         }
     }
 
@@ -365,8 +365,8 @@ public class HBaseConnection {
         }
     }
 
-    private static String getLockPath(String pathName) {
-        return "/create_htable/" + pathName + "/lock";
+    private static String getLockPath(String table) {
+        return "/create_htable/" + table + "/lock";
     }
 
 }
\ No newline at end of file

Reply via email to