minor, add MR job counters in job diagnosis
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d4768c12 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d4768c12 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d4768c12 Branch: refs/heads/master-hbase0.98 Commit: d4768c12e423e96679e4d7db73c697f0378c0a94 Parents: e7510c2 Author: lidongsjtu <[email protected]> Authored: Tue Mar 14 23:03:35 2017 +0800 Committer: lidongsjtu <[email protected]> Committed: Tue Mar 14 23:05:24 2017 +0800 ---------------------------------------------------------------------- build/bin/diag.sh | 2 +- build/conf/kylin-tools-log4j.properties | 1 + .../apache/kylin/tool/JobDiagnosisInfoCLI.java | 6 +- .../apache/kylin/tool/MrJobInfoExtractor.java | 107 ++++++++++++++----- 4 files changed, 86 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/d4768c12/build/bin/diag.sh ---------------------------------------------------------------------- diff --git a/build/bin/diag.sh b/build/bin/diag.sh index e9354a2..a995774 100644 --- a/build/bin/diag.sh +++ b/build/bin/diag.sh @@ -52,7 +52,7 @@ then if [ ${#patient} -eq 36 ]; then hbase ${KYLIN_EXTRA_START_OPTS} \ - -Dlog4j.configuration=kylin-server-log4j.properties \ + -Dlog4j.configuration=kylin-tool-log4j.properties \ -Dcatalina.home=${tomcat_root} \ org.apache.kylin.tool.JobDiagnosisInfoCLI \ -jobId $patient \ http://git-wip-us.apache.org/repos/asf/kylin/blob/d4768c12/build/conf/kylin-tools-log4j.properties ---------------------------------------------------------------------- diff --git a/build/conf/kylin-tools-log4j.properties b/build/conf/kylin-tools-log4j.properties index e975d18..2ccd772 100644 --- a/build/conf/kylin-tools-log4j.properties +++ b/build/conf/kylin-tools-log4j.properties @@ -35,3 +35,4 @@ log4j.appender.stderr.layout.ConversionPattern=%d{ISO8601} %-5p [%t %c{1}:%L]: % #log4j.logger.org.apache.hadoop=ERROR log4j.logger.org.apache.kylin=DEBUG log4j.logger.org.springframework=WARN +log4j.logger.org.apache.commons.httpclient=WARN http://git-wip-us.apache.org/repos/asf/kylin/blob/d4768c12/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java ---------------------------------------------------------------------- diff --git a/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java b/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java index 638d97b..04dbef7 100644 --- a/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java +++ b/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java @@ -192,10 +192,10 @@ public class JobDiagnosisInfoCLI extends AbstractInfoExtractor { private void extractJobInfo(String taskId, File destDir) throws Exception { final Map<String, String> jobInfo = executableDao.getJobOutput(taskId).getInfo(); if (jobInfo.containsKey(ExecutableConstants.MR_JOB_ID)) { - String jobId = jobInfo.get(ExecutableConstants.MR_JOB_ID); + String mrJobId = jobInfo.get(ExecutableConstants.MR_JOB_ID); FileUtils.forceMkdir(destDir); - String[] mrJobArgs = { "-destDir", destDir.getAbsolutePath(), "-compress", "false", "-submodule", "true" }; - new MrJobInfoExtractor(jobId).execute(mrJobArgs); + String[] mrJobArgs = { "-mrJobId", mrJobId, "-destDir", destDir.getAbsolutePath(), "-compress", "false", "-submodule", "true" }; + new MrJobInfoExtractor().execute(mrJobArgs); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/d4768c12/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java ---------------------------------------------------------------------- diff --git a/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java b/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java index ea19885..55b54a5 100644 --- a/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java +++ b/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java @@ -44,22 +44,29 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; public class MrJobInfoExtractor extends AbstractInfoExtractor { - private String mrJobId; - private String jobUrlPrefix; - private static final Logger logger = LoggerFactory.getLogger(MrJobInfoExtractor.class); @SuppressWarnings("static-access") - private static final Option OPTION_INCLUDE_COUNTERS = OptionBuilder.withArgName("includeCounters").hasArg().isRequired(false).withDescription("Specify whether to include mr task counters to extract. Default false.").create("includeCounters"); + private static final Option OPTION_INCLUDE_COUNTERS = OptionBuilder.withArgName("includeCounters").hasArg().isRequired(false).withDescription("Specify whether to include mr task counters to extract. Default true.").create("includeCounters"); + + @SuppressWarnings("static-access") + private static final Option OPTION_MR_JOB_ID = OptionBuilder.withArgName("mrJobId").hasArg().isRequired(false).withDescription("Specify MR Job Id").create("mrJobId"); - private final int HTTP_RETRY = 3; + private static final int HTTP_RETRY = 3; + + public MrJobInfoExtractor() { + packageType = "MR"; + + options.addOption(OPTION_INCLUDE_COUNTERS); + options.addOption(OPTION_MR_JOB_ID); + } - public MrJobInfoExtractor(String mrJobId) { - this.mrJobId = mrJobId; - String historyServerUrl = getRestCheckUrl(); - this.jobUrlPrefix = historyServerUrl + "/ws/v1/history/mapreduce/jobs/" + mrJobId; + public static void main(String[] args) { + MrJobInfoExtractor extractor = new MrJobInfoExtractor(); + extractor.execute(args); } private String getRestCheckUrl() { @@ -84,13 +91,11 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor { if (StringUtils.isEmpty(rmWebHost)) { return null; } - if (rmWebHost.startsWith("http://") || rmWebHost.startsWith("https://")) { - //do nothing - } else { + if (!rmWebHost.startsWith("http://") && !rmWebHost.startsWith("https://")) { rmWebHost = "http://" + rmWebHost; } Matcher m = pattern.matcher(rmWebHost); - m.matches(); + Preconditions.checkArgument(m.matches(), "Yarn master URL not found."); return m.group(1) + m.group(2) + ":19888"; } @@ -115,19 +120,17 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor { return msg; } - private void extractTaskCounter(String taskId, File exportDir, String taskUrl) throws IOException { + private void extractTaskCounter(String taskId, File exportDir, String taskUrl, String id) throws IOException { try { String response = getHttpResponse(taskUrl + taskId + "/counters"); - FileUtils.writeStringToFile(new File(exportDir, taskId + ".json"), response, Charset.defaultCharset()); + FileUtils.writeStringToFile(new File(exportDir, id + "_" + taskId + ".json"), response, Charset.defaultCharset()); } catch (Exception e) { logger.warn("Failed to get task counters rest response" + e); } } - private void extractJobConf(File exportDir) throws IOException { + private void extractJobConf(File exportDir, String jobUrlPrefix) throws IOException { try { - String jobResponse = getHttpResponse(jobUrlPrefix); - JsonNode job = new ObjectMapper().readTree(jobResponse).path("job").get("state"); String confUrl = jobUrlPrefix + "/conf/"; String response = getHttpResponse(confUrl); FileUtils.writeStringToFile(new File(exportDir, "job_conf.json"), response, Charset.defaultCharset()); @@ -139,47 +142,99 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor { @Override protected void executeExtract(OptionsHelper optionsHelper, File exportDir) throws Exception { try { - boolean includeTaskCounter = optionsHelper.hasOption(OPTION_INCLUDE_COUNTERS) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_COUNTERS)) : false; + boolean includeTaskCounter = optionsHelper.hasOption(OPTION_INCLUDE_COUNTERS) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_COUNTERS)) : true; + String mrJobId = optionsHelper.getOptionValue(OPTION_MR_JOB_ID); + String jobUrlPrefix = getRestCheckUrl() + "/ws/v1/history/mapreduce/jobs/" + mrJobId; + if (includeTaskCounter) { - extractTaskCounters(exportDir); + extractTaskCounters(exportDir, jobUrlPrefix); } - extractJobConf(exportDir); + extractJobCounters(exportDir, jobUrlPrefix); + extractJobConf(exportDir, jobUrlPrefix); } catch (Exception e) { logger.warn("Failed to get mr tasks rest response.", e); } } - private void extractTaskCounters(File exportDir) { + private void extractJobCounters(File exportDir, String jobUrlPrefix) { + String url = jobUrlPrefix + "/counters"; + String response = getHttpResponse(url); + try { + File counterDir = new File(exportDir, "counters"); + FileUtils.forceMkdir(counterDir); + FileUtils.writeStringToFile(new File(exportDir, "job_counters.json"), response, Charset.defaultCharset()); + } catch (Exception e) { + logger.warn("Failed to get mr counters rest response.", e); + } + } + + private void extractTaskCounters(File exportDir, String jobUrlPrefix) { try { String tasksUrl = jobUrlPrefix + "/tasks/"; String tasksResponse = getHttpResponse(tasksUrl); JsonNode tasks = new ObjectMapper().readTree(tasksResponse).path("tasks").path("task"); + // find the max map and reduce duation String maxReduceId = null; String maxMapId = null; long maxMapElapsedTime = 0L; long maxReduceElapsedTime = 0L; + // find the min map and reduce duration + String minReduceId = null; + String minMapId = null; + long minMapElapsedTime = Integer.MAX_VALUE; + long minReduceElapsedTime = Integer.MAX_VALUE; + + // find a normal map and reduce duration (the first one) + String normReduceId = null; + String normMapId = null; + long normMapElapsedTime = 0; + long normReduceElapsedTime = 0; for (JsonNode node : tasks) { if (node.get("type").textValue().equals("MAP")) { if (node.get("elapsedTime").longValue() >= maxMapElapsedTime) { maxMapElapsedTime = node.get("elapsedTime").longValue(); maxMapId = node.get("id").textValue(); } + + if (node.get("elapsedTime").longValue() <= minMapElapsedTime) { + minMapElapsedTime = node.get("elapsedTime").longValue(); + minMapId = node.get("id").textValue(); + } + + if (normMapElapsedTime == 0) { + normMapElapsedTime = node.get("elapsedTime").longValue(); + normMapId = node.get("id").textValue(); + } } if (node.get("type").textValue().equals("REDUCE")) { if (node.get("elapsedTime").longValue() >= maxReduceElapsedTime) { maxReduceElapsedTime = node.get("elapsedTime").longValue(); maxReduceId = node.get("id").textValue(); } + + if (node.get("elapsedTime").longValue() <= minReduceElapsedTime) { + minReduceElapsedTime = node.get("elapsedTime").longValue(); + minReduceId = node.get("id").textValue(); + } + + if (normReduceElapsedTime == 0) { + normReduceElapsedTime = node.get("elapsedTime").longValue(); + normReduceId = node.get("id").textValue(); + } } } File counterDir = new File(exportDir, "counters"); FileUtils.forceMkdir(counterDir); - extractTaskCounter(maxMapId, counterDir, tasksUrl); - extractTaskCounter(maxReduceId, counterDir, tasksUrl); + extractTaskCounter(maxMapId, counterDir, tasksUrl, "max"); + extractTaskCounter(maxReduceId, counterDir, tasksUrl, "max"); + extractTaskCounter(minMapId, counterDir, tasksUrl, "min"); + extractTaskCounter(minReduceId, counterDir, tasksUrl, "min"); + extractTaskCounter(normMapId, counterDir, tasksUrl, "norm"); + extractTaskCounter(normReduceId, counterDir, tasksUrl, "norm"); } catch (Exception e) { - logger.warn("Failed to get mr tasks rest response" + e); + logger.warn("Failed to get mr tasks rest response.", e); } } -} +} \ No newline at end of file
