APACHE-KYLIN-2935: Improve the way to deploy coprocessor

Signed-off-by: lidongsjtu <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/7ca66d43
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/7ca66d43
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/7ca66d43

Branch: refs/heads/master
Commit: 7ca66d436ff5ec97dd80741dbaa9d73812743648
Parents: cc70166
Author: Zhong <[email protected]>
Authored: Fri Oct 13 12:21:50 2017 +0800
Committer: lidongsjtu <[email protected]>
Committed: Wed Dec 20 23:21:17 2017 +0800

----------------------------------------------------------------------
 .../hbase/util/DeployCoprocessorCLI.java        | 66 ++++++++++++++++----
 1 file changed, 55 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/7ca66d43/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
index aa69151..366e8cc 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
@@ -79,6 +79,8 @@ public class DeployCoprocessorCLI {
 
     private static final Logger logger = 
LoggerFactory.getLogger(DeployCoprocessorCLI.class);
 
+    private static int MAX_THREADS = 8;
+
     public static void main(String[] args) throws IOException {
 
         if (args == null || args.length <= 1) {
@@ -94,29 +96,40 @@ public class DeployCoprocessorCLI {
         try {
             hbaseAdmin = conn.getAdmin();
             String localCoprocessorJar;
-            if ("default".equals(args[0])) {
+            int curIdx = 0;
+            if ("default".equals(args[curIdx++])) {
                 localCoprocessorJar = kylinConfig.getCoprocessorLocalJar();
             } else {
-                localCoprocessorJar = new File(args[0]).getAbsolutePath();
+                localCoprocessorJar = new 
File(args[curIdx++]).getAbsolutePath();
             }
 
             logger.info("Identify coprocessor jar " + localCoprocessorJar);
 
+            try {
+                MAX_THREADS = Integer.parseInt(args[curIdx++]);
+            } catch (Exception e) {
+                curIdx--;
+            }
+            logger.info("Use at most {} threads to do upgrade", MAX_THREADS);
+
             List<String> tableNames = getHTableNames(kylinConfig);
             logger.info("Identify tables " + tableNames);
 
-            String filterType = args[1].toLowerCase();
+            String filterType = args[curIdx++].toLowerCase();
             if (filterType.equals("-table")) {
-                tableNames = filterByTables(tableNames, 
Arrays.asList(args).subList(2, args.length));
+                tableNames = filterByTables(tableNames, 
Arrays.asList(args).subList(curIdx, args.length));
             } else if (filterType.equals("-cube")) {
-                tableNames = filterByCubes(tableNames, 
Arrays.asList(args).subList(2, args.length));
+                tableNames = filterByCubes(tableNames, 
Arrays.asList(args).subList(curIdx, args.length));
             } else if (filterType.equals("-project")) {
-                tableNames = filterByProjects(tableNames, 
Arrays.asList(args).subList(2, args.length));
+                tableNames = filterByProjects(tableNames, 
Arrays.asList(args).subList(curIdx, args.length));
             } else if (!filterType.equals("all")) {
                 printUsageAndExit();
             }
+            logger.info("Tables after filtering by type " + filterType + ": " 
+ tableNames);
 
+            tableNames = filterByGitCommit(hbaseAdmin, tableNames);
             logger.info("Will execute tables " + tableNames);
+
             long start = System.currentTimeMillis();
 
             Set<String> oldJarPaths = getCoprocessorJarPaths(hbaseAdmin, 
tableNames);
@@ -145,13 +158,39 @@ public class DeployCoprocessorCLI {
 
     private static void printUsageAndExit() {
         logger.info("Usage: ");
-        logger.info("$KYLIN_HOME/bin/kylin.sh  
org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI 
$KYLIN_HOME/lib/kylin-coprocessor-*.jar all");
-        logger.info("$KYLIN_HOME/bin/kylin.sh  
org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI 
$KYLIN_HOME/lib/kylin-coprocessor-*.jar -table tableName1 tableName2 ...");
-        logger.info("$KYLIN_HOME/bin/kylin.sh  
org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI 
$KYLIN_HOME/lib/kylin-coprocessor-*.jar -cube cubeName1 cubeName2 ... ");
-        logger.info("$KYLIN_HOME/bin/kylin.sh  
org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI 
$KYLIN_HOME/lib/kylin-coprocessor-*.jar -project projectName1 projectName2 
...");
+        logger.info(
+                "$KYLIN_HOME/bin/kylin.sh  
org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI 
$KYLIN_HOME/lib/kylin-coprocessor-*.jar [nOfThread] all");
+        logger.info(
+                "$KYLIN_HOME/bin/kylin.sh  
org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI 
$KYLIN_HOME/lib/kylin-coprocessor-*.jar [nOfThread] -table tableName1 
tableName2 ...");
+        logger.info(
+                "$KYLIN_HOME/bin/kylin.sh  
org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI 
$KYLIN_HOME/lib/kylin-coprocessor-*.jar [nOfThread] -cube cubeName1 cubeName2 
... ");
+        logger.info(
+                "$KYLIN_HOME/bin/kylin.sh  
org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI 
$KYLIN_HOME/lib/kylin-coprocessor-*.jar [nOfThread] -project projectName1 
projectName2 ...");
         System.exit(0);
     }
 
+    private static List<String> filterByGitCommit(Admin hbaseAdmin, 
List<String> tableNames) throws IOException {
+        List<String> result = Lists.newLinkedList();
+        List<String> filteredList = Lists.newLinkedList();
+
+        String commitInfo = KylinVersion.getGitCommitInfo();
+        if (StringUtils.isEmpty(commitInfo)) {
+            return tableNames;
+        }
+        logger.info("Commit Information: " + commitInfo);
+        for (String tableName : tableNames) {
+            HTableDescriptor tableDesc = 
hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
+            String gitTag = 
tableDesc.getValue(IRealizationConstants.HTableGitTag);
+            if (commitInfo.equals(gitTag)) {
+                filteredList.add(tableName);
+            } else {
+                result.add(tableName);
+            }
+        }
+        logger.info("Filtered tables don't need to deploy coprocessors: " + 
filteredList);
+        return result;
+    }
+
     private static List<String> filterByProjects(List<String> allTableNames, 
List<String> projectNames) {
         ProjectManager projectManager = 
ProjectManager.getInstance(KylinConfig.getInstanceFromEnv());
         CubeManager cubeManager = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
@@ -291,7 +330,12 @@ public class DeployCoprocessorCLI {
         List<String> processedTables = Collections.synchronizedList(new 
ArrayList<String>());
         List<String> failedTables = Collections.synchronizedList(new 
ArrayList<String>());
 
-        ExecutorService coprocessorPool = 
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
+        int nThread = Runtime.getRuntime().availableProcessors() * 2;
+        if (nThread > MAX_THREADS) {
+            nThread = MAX_THREADS;
+        }
+        logger.info("Use {} threads to do upgrade", nThread);
+        ExecutorService coprocessorPool = 
Executors.newFixedThreadPool(nThread);
         CountDownLatch countDownLatch = new CountDownLatch(tableNames.size());
 
         for (final String tableName : tableNames) {

Reply via email to