Repository: kylin Updated Branches: refs/heads/master 3bdafb1a1 -> 8615be974
minor, parallelize diagnosis tool Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/8615be97 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/8615be97 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/8615be97 Branch: refs/heads/master Commit: 8615be974cfa99f54ceeb995ff2122dec0ded636 Parents: 3bdafb1 Author: lidongsjtu <lid...@apache.org> Authored: Thu Jul 21 21:57:49 2016 +0800 Committer: lidongsjtu <lid...@apache.org> Committed: Fri Jul 22 11:01:07 2016 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/tool/DiagnosisInfoCLI.java | 119 ++++++++++++++----- 1 file changed, 88 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/8615be97/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java ---------------------------------------------------------------------- diff --git a/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java b/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java index cf563a5..3e24868 100644 --- a/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java +++ b/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java @@ -22,6 +22,9 @@ import java.io.File; import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; @@ -55,6 +58,13 @@ public class DiagnosisInfoCLI extends AbstractInfoExtractor { @SuppressWarnings("static-access") private static final Option OPTION_INCLUDE_JOB = OptionBuilder.withArgName("includeJobs").hasArg().isRequired(false).withDescription("Specify whether to include job info to extract. Default true.").create("includeJobs"); + @SuppressWarnings("static-access") + private static final Option OPTION_THREADS = OptionBuilder.withArgName("threads").hasArg().isRequired(false).withDescription("Specify number of threads for parallel extraction.").create("threads"); + + private static final int DEFAULT_PARALLEL_SIZE = 4; + + private ExecutorService executorService; + public DiagnosisInfoCLI() { super(); @@ -65,6 +75,7 @@ public class DiagnosisInfoCLI extends AbstractInfoExtractor { options.addOption(OPTION_INCLUDE_HBASE); options.addOption(OPTION_INCLUDE_CLIENT); options.addOption(OPTION_INCLUDE_JOB); + } public static void main(String[] args) { @@ -89,57 +100,103 @@ public class DiagnosisInfoCLI extends AbstractInfoExtractor { } @Override - protected void executeExtract(OptionsHelper optionsHelper, File exportDir) throws IOException { + protected void executeExtract(final OptionsHelper optionsHelper, final File exportDir) throws IOException { final String projectInput = optionsHelper.getOptionValue(options.getOption("project")); - boolean includeConf = optionsHelper.hasOption(OPTION_INCLUDE_CONF) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_CONF)) : true; - boolean includeHBase = optionsHelper.hasOption(OPTION_INCLUDE_HBASE) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_HBASE)) : true; - boolean includeClient = optionsHelper.hasOption(OPTION_INCLUDE_CLIENT) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_CLIENT)) : true; - boolean includeJob = optionsHelper.hasOption(OPTION_INCLUDE_JOB) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_JOB)) : true; - String projectNames = StringUtils.join(getProjects(projectInput), ","); + final boolean includeConf = optionsHelper.hasOption(OPTION_INCLUDE_CONF) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_CONF)) : true; + final boolean includeHBase = optionsHelper.hasOption(OPTION_INCLUDE_HBASE) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_HBASE)) : true; + final boolean includeClient = optionsHelper.hasOption(OPTION_INCLUDE_CLIENT) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_CLIENT)) : true; + final boolean includeJob = optionsHelper.hasOption(OPTION_INCLUDE_JOB) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_JOB)) : true; + final int threadsNum = optionsHelper.hasOption(OPTION_THREADS) ? Integer.valueOf(optionsHelper.getOptionValue(OPTION_THREADS)) : DEFAULT_PARALLEL_SIZE; + final String projectNames = StringUtils.join(getProjects(projectInput), ","); + + logger.info("Start diagnosis info extraction in {} threads.", threadsNum); + executorService = Executors.newFixedThreadPool(threadsNum); // export cube metadata - String[] cubeMetaArgs = { "-destDir", new File(exportDir, "metadata").getAbsolutePath(), "-project", projectNames, "-compress", "false", "-includeJobs", "false", "-submodule", "true" }; - CubeMetaExtractor cubeMetaExtractor = new CubeMetaExtractor(); - logger.info("CubeMetaExtractor args: " + Arrays.toString(cubeMetaArgs)); - cubeMetaExtractor.execute(cubeMetaArgs); + executorService.execute(new Runnable() { + @Override + public void run() { + String[] cubeMetaArgs = { "-destDir", new File(exportDir, "metadata").getAbsolutePath(), "-project", projectNames, "-compress", "false", "-includeJobs", "false", "-submodule", "true" }; + CubeMetaExtractor cubeMetaExtractor = new CubeMetaExtractor(); + logger.info("CubeMetaExtractor args: " + Arrays.toString(cubeMetaArgs)); + cubeMetaExtractor.execute(cubeMetaArgs); + } + }); // extract all job instances if (includeJob) { - String[] jobArgs = { "-destDir", new File(exportDir, "jobs").getAbsolutePath(), "-compress", "false", "-submodule", "true" }; - JobInstanceExtractor jobInstanceExtractor = new JobInstanceExtractor(); - jobInstanceExtractor.execute(jobArgs); + executorService.execute(new Runnable() { + @Override + public void run() { + String[] jobArgs = { "-destDir", new File(exportDir, "jobs").getAbsolutePath(), "-compress", "false", "-submodule", "true" }; + JobInstanceExtractor jobInstanceExtractor = new JobInstanceExtractor(); + jobInstanceExtractor.execute(jobArgs); + } + }); } // export HBase if (includeHBase) { - String[] hbaseArgs = { "-destDir", new File(exportDir, "hbase").getAbsolutePath(), "-project", projectNames, "-compress", "false", "-submodule", "true" }; - HBaseUsageExtractor hBaseUsageExtractor = new HBaseUsageExtractor(); - logger.info("HBaseUsageExtractor args: " + Arrays.toString(hbaseArgs)); - hBaseUsageExtractor.execute(hbaseArgs); + executorService.execute(new Runnable() { + @Override + public void run() { + String[] hbaseArgs = { "-destDir", new File(exportDir, "hbase").getAbsolutePath(), "-project", projectNames, "-compress", "false", "-submodule", "true" }; + HBaseUsageExtractor hBaseUsageExtractor = new HBaseUsageExtractor(); + logger.info("HBaseUsageExtractor args: " + Arrays.toString(hbaseArgs)); + hBaseUsageExtractor.execute(hbaseArgs); + } + }); } // export conf if (includeConf) { - logger.info("Start to extract kylin conf files."); - try { - FileUtils.copyDirectoryToDirectory(new File(ToolUtil.getConfFolder()), exportDir); - } catch (Exception e) { - logger.warn("Error in export conf.", e); - } + executorService.execute(new Runnable() { + @Override + public void run() { + logger.info("Start to extract kylin conf files."); + try { + FileUtils.copyDirectoryToDirectory(new File(ToolUtil.getConfFolder()), exportDir); + } catch (Exception e) { + logger.warn("Error in export conf.", e); + } + } + }); + } // export client if (includeClient) { - String[] clientArgs = { "-destDir", new File(exportDir, "client").getAbsolutePath(), "-compress", "false", "-submodule", "true" }; - ClientEnvExtractor clientEnvExtractor = new ClientEnvExtractor(); - logger.info("ClientEnvExtractor args: " + Arrays.toString(clientArgs)); - clientEnvExtractor.execute(clientArgs); + executorService.execute(new Runnable() { + @Override + public void run() { + try { + String[] clientArgs = { "-destDir", new File(exportDir, "client").getAbsolutePath(), "-compress", "false", "-submodule", "true" }; + ClientEnvExtractor clientEnvExtractor = new ClientEnvExtractor(); + logger.info("ClientEnvExtractor args: " + Arrays.toString(clientArgs)); + clientEnvExtractor.execute(clientArgs); + } catch (IOException e) { + logger.warn("Error in export client info.", e); + } + } + }); } // export logs - String[] logsArgs = { "-destDir", new File(exportDir, "logs").getAbsolutePath(), "-compress", "false", "-submodule", "true" }; - KylinLogExtractor logExtractor = new KylinLogExtractor(); - logger.info("KylinLogExtractor args: " + Arrays.toString(logsArgs)); - logExtractor.execute(logsArgs); + executorService.execute(new Runnable() { + @Override + public void run() { + String[] logsArgs = { "-destDir", new File(exportDir, "logs").getAbsolutePath(), "-compress", "false", "-submodule", "true" }; + KylinLogExtractor logExtractor = new KylinLogExtractor(); + logger.info("KylinLogExtractor args: " + Arrays.toString(logsArgs)); + logExtractor.execute(logsArgs); + } + }); + + executorService.shutdown(); + try { + executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES); + } catch (InterruptedException e) { + throw new RuntimeException("Diagnosis info dump interrupted.", e); + } } }