APACHE-KYLIN-2935: Improve the way to deploy coprocessor Signed-off-by: lidongsjtu <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/7ca66d43 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/7ca66d43 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/7ca66d43 Branch: refs/heads/master Commit: 7ca66d436ff5ec97dd80741dbaa9d73812743648 Parents: cc70166 Author: Zhong <[email protected]> Authored: Fri Oct 13 12:21:50 2017 +0800 Committer: lidongsjtu <[email protected]> Committed: Wed Dec 20 23:21:17 2017 +0800 ---------------------------------------------------------------------- .../hbase/util/DeployCoprocessorCLI.java | 66 ++++++++++++++++---- 1 file changed, 55 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/7ca66d43/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java index aa69151..366e8cc 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java @@ -79,6 +79,8 @@ public class DeployCoprocessorCLI { private static final Logger logger = LoggerFactory.getLogger(DeployCoprocessorCLI.class); + private static int MAX_THREADS = 8; + public static void main(String[] args) throws IOException { if (args == null || args.length <= 1) { @@ -94,29 +96,40 @@ public class DeployCoprocessorCLI { try { hbaseAdmin = conn.getAdmin(); String localCoprocessorJar; - if ("default".equals(args[0])) { + int curIdx = 0; + if ("default".equals(args[curIdx++])) { localCoprocessorJar = kylinConfig.getCoprocessorLocalJar(); } else { - localCoprocessorJar = new File(args[0]).getAbsolutePath(); + localCoprocessorJar = new File(args[curIdx++]).getAbsolutePath(); } logger.info("Identify coprocessor jar " + localCoprocessorJar); + try { + MAX_THREADS = Integer.parseInt(args[curIdx++]); + } catch (Exception e) { + curIdx--; + } + logger.info("Use at most {} threads to do upgrade", MAX_THREADS); + List<String> tableNames = getHTableNames(kylinConfig); logger.info("Identify tables " + tableNames); - String filterType = args[1].toLowerCase(); + String filterType = args[curIdx++].toLowerCase(); if (filterType.equals("-table")) { - tableNames = filterByTables(tableNames, Arrays.asList(args).subList(2, args.length)); + tableNames = filterByTables(tableNames, Arrays.asList(args).subList(curIdx, args.length)); } else if (filterType.equals("-cube")) { - tableNames = filterByCubes(tableNames, Arrays.asList(args).subList(2, args.length)); + tableNames = filterByCubes(tableNames, Arrays.asList(args).subList(curIdx, args.length)); } else if (filterType.equals("-project")) { - tableNames = filterByProjects(tableNames, Arrays.asList(args).subList(2, args.length)); + tableNames = filterByProjects(tableNames, Arrays.asList(args).subList(curIdx, args.length)); } else if (!filterType.equals("all")) { printUsageAndExit(); } + logger.info("Tables after filtering by type " + filterType + ": " + tableNames); + tableNames = filterByGitCommit(hbaseAdmin, tableNames); logger.info("Will execute tables " + tableNames); + long start = System.currentTimeMillis(); Set<String> oldJarPaths = getCoprocessorJarPaths(hbaseAdmin, tableNames); @@ -145,13 +158,39 @@ public class DeployCoprocessorCLI { private static void printUsageAndExit() { logger.info("Usage: "); - logger.info("$KYLIN_HOME/bin/kylin.sh org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI $KYLIN_HOME/lib/kylin-coprocessor-*.jar all"); - logger.info("$KYLIN_HOME/bin/kylin.sh org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI $KYLIN_HOME/lib/kylin-coprocessor-*.jar -table tableName1 tableName2 ..."); - logger.info("$KYLIN_HOME/bin/kylin.sh org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI $KYLIN_HOME/lib/kylin-coprocessor-*.jar -cube cubeName1 cubeName2 ... "); - logger.info("$KYLIN_HOME/bin/kylin.sh org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI $KYLIN_HOME/lib/kylin-coprocessor-*.jar -project projectName1 projectName2 ..."); + logger.info( + "$KYLIN_HOME/bin/kylin.sh org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI $KYLIN_HOME/lib/kylin-coprocessor-*.jar [nOfThread] all"); + logger.info( + "$KYLIN_HOME/bin/kylin.sh org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI $KYLIN_HOME/lib/kylin-coprocessor-*.jar [nOfThread] -table tableName1 tableName2 ..."); + logger.info( + "$KYLIN_HOME/bin/kylin.sh org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI $KYLIN_HOME/lib/kylin-coprocessor-*.jar [nOfThread] -cube cubeName1 cubeName2 ... "); + logger.info( + "$KYLIN_HOME/bin/kylin.sh org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI $KYLIN_HOME/lib/kylin-coprocessor-*.jar [nOfThread] -project projectName1 projectName2 ..."); System.exit(0); } + private static List<String> filterByGitCommit(Admin hbaseAdmin, List<String> tableNames) throws IOException { + List<String> result = Lists.newLinkedList(); + List<String> filteredList = Lists.newLinkedList(); + + String commitInfo = KylinVersion.getGitCommitInfo(); + if (StringUtils.isEmpty(commitInfo)) { + return tableNames; + } + logger.info("Commit Information: " + commitInfo); + for (String tableName : tableNames) { + HTableDescriptor tableDesc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); + String gitTag = tableDesc.getValue(IRealizationConstants.HTableGitTag); + if (commitInfo.equals(gitTag)) { + filteredList.add(tableName); + } else { + result.add(tableName); + } + } + logger.info("Filtered tables don't need to deploy coprocessors: " + filteredList); + return result; + } + private static List<String> filterByProjects(List<String> allTableNames, List<String> projectNames) { ProjectManager projectManager = ProjectManager.getInstance(KylinConfig.getInstanceFromEnv()); CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); @@ -291,7 +330,12 @@ public class DeployCoprocessorCLI { List<String> processedTables = Collections.synchronizedList(new ArrayList<String>()); List<String> failedTables = Collections.synchronizedList(new ArrayList<String>()); - ExecutorService coprocessorPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); + int nThread = Runtime.getRuntime().availableProcessors() * 2; + if (nThread > MAX_THREADS) { + nThread = MAX_THREADS; + } + logger.info("Use {} threads to do upgrade", nThread); + ExecutorService coprocessorPool = Executors.newFixedThreadPool(nThread); CountDownLatch countDownLatch = new CountDownLatch(tableNames.size()); for (final String tableName : tableNames) {
