Repository: kylin
Updated Branches:
  refs/heads/master 3bdafb1a1 -> 8615be974


minor, parallelize diagnosis tool


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/8615be97
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/8615be97
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/8615be97

Branch: refs/heads/master
Commit: 8615be974cfa99f54ceeb995ff2122dec0ded636
Parents: 3bdafb1
Author: lidongsjtu <lid...@apache.org>
Authored: Thu Jul 21 21:57:49 2016 +0800
Committer: lidongsjtu <lid...@apache.org>
Committed: Fri Jul 22 11:01:07 2016 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/tool/DiagnosisInfoCLI.java | 119 ++++++++++++++-----
 1 file changed, 88 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/8615be97/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java 
b/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java
index cf563a5..3e24868 100644
--- a/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java
@@ -22,6 +22,9 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
@@ -55,6 +58,13 @@ public class DiagnosisInfoCLI extends AbstractInfoExtractor {
     @SuppressWarnings("static-access")
     private static final Option OPTION_INCLUDE_JOB = 
OptionBuilder.withArgName("includeJobs").hasArg().isRequired(false).withDescription("Specify
 whether to include job info to extract. Default true.").create("includeJobs");
 
+    @SuppressWarnings("static-access")
+    private static final Option OPTION_THREADS = 
OptionBuilder.withArgName("threads").hasArg().isRequired(false).withDescription("Specify
 number of threads for parallel extraction.").create("threads");
+
+    private static final int DEFAULT_PARALLEL_SIZE = 4;
+
+    private ExecutorService executorService;
+
     public DiagnosisInfoCLI() {
         super();
 
@@ -65,6 +75,7 @@ public class DiagnosisInfoCLI extends AbstractInfoExtractor {
         options.addOption(OPTION_INCLUDE_HBASE);
         options.addOption(OPTION_INCLUDE_CLIENT);
         options.addOption(OPTION_INCLUDE_JOB);
+
     }
 
     public static void main(String[] args) {
@@ -89,57 +100,103 @@ public class DiagnosisInfoCLI extends 
AbstractInfoExtractor {
     }
 
     @Override
-    protected void executeExtract(OptionsHelper optionsHelper, File exportDir) 
throws IOException {
+    protected void executeExtract(final OptionsHelper optionsHelper, final 
File exportDir) throws IOException {
         final String projectInput = 
optionsHelper.getOptionValue(options.getOption("project"));
-        boolean includeConf = optionsHelper.hasOption(OPTION_INCLUDE_CONF) ? 
Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_CONF)) : true;
-        boolean includeHBase = optionsHelper.hasOption(OPTION_INCLUDE_HBASE) ? 
Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_HBASE)) : true;
-        boolean includeClient = optionsHelper.hasOption(OPTION_INCLUDE_CLIENT) 
? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_CLIENT)) : true;
-        boolean includeJob = optionsHelper.hasOption(OPTION_INCLUDE_JOB) ? 
Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_JOB)) : true;
-        String projectNames = StringUtils.join(getProjects(projectInput), ",");
+        final boolean includeConf = 
optionsHelper.hasOption(OPTION_INCLUDE_CONF) ? 
Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_CONF)) : true;
+        final boolean includeHBase = 
optionsHelper.hasOption(OPTION_INCLUDE_HBASE) ? 
Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_HBASE)) : true;
+        final boolean includeClient = 
optionsHelper.hasOption(OPTION_INCLUDE_CLIENT) ? 
Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_CLIENT)) : true;
+        final boolean includeJob = optionsHelper.hasOption(OPTION_INCLUDE_JOB) 
? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_JOB)) : true;
+        final int threadsNum = optionsHelper.hasOption(OPTION_THREADS) ? 
Integer.valueOf(optionsHelper.getOptionValue(OPTION_THREADS)) : 
DEFAULT_PARALLEL_SIZE;
+        final String projectNames = 
StringUtils.join(getProjects(projectInput), ",");
+
+        logger.info("Start diagnosis info extraction in {} threads.", 
threadsNum);
+        executorService = Executors.newFixedThreadPool(threadsNum);
 
         // export cube metadata
-        String[] cubeMetaArgs = { "-destDir", new File(exportDir, 
"metadata").getAbsolutePath(), "-project", projectNames, "-compress", "false", 
"-includeJobs", "false", "-submodule", "true" };
-        CubeMetaExtractor cubeMetaExtractor = new CubeMetaExtractor();
-        logger.info("CubeMetaExtractor args: " + 
Arrays.toString(cubeMetaArgs));
-        cubeMetaExtractor.execute(cubeMetaArgs);
+        executorService.execute(new Runnable() {
+            @Override
+            public void run() {
+                String[] cubeMetaArgs = { "-destDir", new File(exportDir, 
"metadata").getAbsolutePath(), "-project", projectNames, "-compress", "false", 
"-includeJobs", "false", "-submodule", "true" };
+                CubeMetaExtractor cubeMetaExtractor = new CubeMetaExtractor();
+                logger.info("CubeMetaExtractor args: " + 
Arrays.toString(cubeMetaArgs));
+                cubeMetaExtractor.execute(cubeMetaArgs);
+            }
+        });
 
         // extract all job instances
         if (includeJob) {
-            String[] jobArgs = { "-destDir", new File(exportDir, 
"jobs").getAbsolutePath(), "-compress", "false", "-submodule", "true" };
-            JobInstanceExtractor jobInstanceExtractor = new 
JobInstanceExtractor();
-            jobInstanceExtractor.execute(jobArgs);
+            executorService.execute(new Runnable() {
+                @Override
+                public void run() {
+                    String[] jobArgs = { "-destDir", new File(exportDir, 
"jobs").getAbsolutePath(), "-compress", "false", "-submodule", "true" };
+                    JobInstanceExtractor jobInstanceExtractor = new 
JobInstanceExtractor();
+                    jobInstanceExtractor.execute(jobArgs);
+                }
+            });
         }
 
         // export HBase
         if (includeHBase) {
-            String[] hbaseArgs = { "-destDir", new File(exportDir, 
"hbase").getAbsolutePath(), "-project", projectNames, "-compress", "false", 
"-submodule", "true" };
-            HBaseUsageExtractor hBaseUsageExtractor = new 
HBaseUsageExtractor();
-            logger.info("HBaseUsageExtractor args: " + 
Arrays.toString(hbaseArgs));
-            hBaseUsageExtractor.execute(hbaseArgs);
+            executorService.execute(new Runnable() {
+                @Override
+                public void run() {
+                    String[] hbaseArgs = { "-destDir", new File(exportDir, 
"hbase").getAbsolutePath(), "-project", projectNames, "-compress", "false", 
"-submodule", "true" };
+                    HBaseUsageExtractor hBaseUsageExtractor = new 
HBaseUsageExtractor();
+                    logger.info("HBaseUsageExtractor args: " + 
Arrays.toString(hbaseArgs));
+                    hBaseUsageExtractor.execute(hbaseArgs);
+                }
+            });
         }
 
         // export conf
         if (includeConf) {
-            logger.info("Start to extract kylin conf files.");
-            try {
-                FileUtils.copyDirectoryToDirectory(new 
File(ToolUtil.getConfFolder()), exportDir);
-            } catch (Exception e) {
-                logger.warn("Error in export conf.", e);
-            }
+            executorService.execute(new Runnable() {
+                @Override
+                public void run() {
+                    logger.info("Start to extract kylin conf files.");
+                    try {
+                        FileUtils.copyDirectoryToDirectory(new 
File(ToolUtil.getConfFolder()), exportDir);
+                    } catch (Exception e) {
+                        logger.warn("Error in export conf.", e);
+                    }
+                }
+            });
+
         }
 
         // export client
         if (includeClient) {
-            String[] clientArgs = { "-destDir", new File(exportDir, 
"client").getAbsolutePath(), "-compress", "false", "-submodule", "true" };
-            ClientEnvExtractor clientEnvExtractor = new ClientEnvExtractor();
-            logger.info("ClientEnvExtractor args: " + 
Arrays.toString(clientArgs));
-            clientEnvExtractor.execute(clientArgs);
+            executorService.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        String[] clientArgs = { "-destDir", new 
File(exportDir, "client").getAbsolutePath(), "-compress", "false", 
"-submodule", "true" };
+                        ClientEnvExtractor clientEnvExtractor = new 
ClientEnvExtractor();
+                        logger.info("ClientEnvExtractor args: " + 
Arrays.toString(clientArgs));
+                        clientEnvExtractor.execute(clientArgs);
+                    } catch (IOException e) {
+                        logger.warn("Error in export client info.", e);
+                    }
+                }
+            });
         }
 
         // export logs
-        String[] logsArgs = { "-destDir", new File(exportDir, 
"logs").getAbsolutePath(), "-compress", "false", "-submodule", "true" };
-        KylinLogExtractor logExtractor = new KylinLogExtractor();
-        logger.info("KylinLogExtractor args: " + Arrays.toString(logsArgs));
-        logExtractor.execute(logsArgs);
+        executorService.execute(new Runnable() {
+            @Override
+            public void run() {
+                String[] logsArgs = { "-destDir", new File(exportDir, 
"logs").getAbsolutePath(), "-compress", "false", "-submodule", "true" };
+                KylinLogExtractor logExtractor = new KylinLogExtractor();
+                logger.info("KylinLogExtractor args: " + 
Arrays.toString(logsArgs));
+                logExtractor.execute(logsArgs);
+            }
+        });
+
+        executorService.shutdown();
+        try {
+            executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
+        } catch (InterruptedException e) {
+            throw new RuntimeException("Diagnosis info dump interrupted.", e);
+        }
     }
 }

Reply via email to