minor, more verbose on job diag
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5155994e Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5155994e Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5155994e Branch: refs/heads/master-hbase0.98 Commit: 5155994e1a84e47d9ce8d90d3787a28910673b66 Parents: e4ed232 Author: lidongsjtu <[email protected]> Authored: Wed Mar 15 14:44:17 2017 +0800 Committer: lidongsjtu <[email protected]> Committed: Wed Mar 15 14:51:55 2017 +0800 ---------------------------------------------------------------------- build/bin/diag.sh | 4 +- build/conf/kylin-tools-log4j.properties | 2 +- .../apache/kylin/tool/MrJobInfoExtractor.java | 208 +++++++++++++------ 3 files changed, 149 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/5155994e/build/bin/diag.sh ---------------------------------------------------------------------- diff --git a/build/bin/diag.sh b/build/bin/diag.sh index a995774..bb1bdc4 100644 --- a/build/bin/diag.sh +++ b/build/bin/diag.sh @@ -52,14 +52,14 @@ then if [ ${#patient} -eq 36 ]; then hbase ${KYLIN_EXTRA_START_OPTS} \ - -Dlog4j.configuration=kylin-tool-log4j.properties \ + -Dlog4j.configuration=file:${KYLIN_HOME}/conf/kylin-tools-log4j.properties \ -Dcatalina.home=${tomcat_root} \ org.apache.kylin.tool.JobDiagnosisInfoCLI \ -jobId $patient \ -destDir $destDir || exit 1 else hbase ${KYLIN_EXTRA_START_OPTS} \ - -Dlog4j.configuration=kylin-server-log4j.properties \ + -Dlog4j.configuration=file:${KYLIN_HOME}/conf/kylin-tools-log4j.properties \ -Dcatalina.home=${tomcat_root} \ org.apache.kylin.tool.DiagnosisInfoCLI \ -project -all \ http://git-wip-us.apache.org/repos/asf/kylin/blob/5155994e/build/conf/kylin-tools-log4j.properties ---------------------------------------------------------------------- diff --git a/build/conf/kylin-tools-log4j.properties b/build/conf/kylin-tools-log4j.properties index 2ccd772..d47f9a2 100644 --- a/build/conf/kylin-tools-log4j.properties +++ b/build/conf/kylin-tools-log4j.properties @@ -35,4 +35,4 @@ log4j.appender.stderr.layout.ConversionPattern=%d{ISO8601} %-5p [%t %c{1}:%L]: % #log4j.logger.org.apache.hadoop=ERROR log4j.logger.org.apache.kylin=DEBUG log4j.logger.org.springframework=WARN -log4j.logger.org.apache.commons.httpclient=WARN +log4j.logger.org.apache.kylin.tool.shaded=INFO http://git-wip-us.apache.org/repos/asf/kylin/blob/5155994e/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java ---------------------------------------------------------------------- diff --git a/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java b/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java index 483694b..b9bf2de 100644 --- a/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java +++ b/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java @@ -21,6 +21,7 @@ package org.apache.kylin.tool; import java.io.File; import java.io.IOException; import java.nio.charset.Charset; +import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -45,12 +46,13 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; public class MrJobInfoExtractor extends AbstractInfoExtractor { private static final Logger logger = LoggerFactory.getLogger(MrJobInfoExtractor.class); @SuppressWarnings("static-access") - private static final Option OPTION_INCLUDE_COUNTERS = OptionBuilder.withArgName("includeCounters").hasArg().isRequired(false).withDescription("Specify whether to include mr task counters to extract. Default true.").create("includeCounters"); + private static final Option OPTION_INCLUDE_DETAILS = OptionBuilder.withArgName("includeTasks").hasArg().isRequired(false).withDescription("Specify whether to include mr task details to extract. Default true.").create("includeTasks"); @SuppressWarnings("static-access") private static final Option OPTION_MR_JOB_ID = OptionBuilder.withArgName("mrJobId").hasArg().isRequired(false).withDescription("Specify MR Job Id").create("mrJobId"); @@ -60,7 +62,7 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor { public MrJobInfoExtractor() { packageType = "MR"; - options.addOption(OPTION_INCLUDE_COUNTERS); + options.addOption(OPTION_INCLUDE_DETAILS); options.addOption(OPTION_MR_JOB_ID); } @@ -79,7 +81,7 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor { return m.group(1) + m.group(2) + ":19888"; } } - logger.info("kylin.engine.mr.yarn-check-status-url" + " is not set read from hadoop configuration"); + logger.info("kylin.engine.mr.yarn-check-status-url" + " is not set, read from hadoop configuration"); Configuration conf = HadoopUtil.getCurrentConfiguration(); String rmWebHost = HAUtil.getConfValueForRMInstance(YarnConfiguration.RM_WEBAPP_ADDRESS, YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, conf); @@ -120,69 +122,113 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor { return msg; } - private void extractTaskCounter(String taskId, File exportDir, String taskUrl, String id) throws IOException { + private void extractTaskDetail(String taskId, String nodeId, String user, File exportDir, String taskUrl, String urlBase) throws IOException { try { - String response = getHttpResponse(taskUrl + taskId + "/counters"); - FileUtils.writeStringToFile(new File(exportDir, id + "_" + taskId + ".json"), response, Charset.defaultCharset()); + if (StringUtils.isEmpty(taskId)) { + return; + } + + String taskUrlBase = taskUrl + taskId; + File destDir = new File(exportDir, taskId); + + // get task basic info + String taskInfo = saveHttpResponseQuietly(new File(destDir, "task.json"), taskUrlBase); + JsonNode taskAttempt = new ObjectMapper().readTree(taskInfo).path("task").path("successfulAttempt"); + String succAttemptId = taskAttempt.textValue(); + + String attemptInfo = saveHttpResponseQuietly(new File(destDir, "task_attempts.json"), taskUrlBase + "/attempts/" + succAttemptId); + JsonNode attemptAttempt = new ObjectMapper().readTree(attemptInfo).path("taskAttempt").path("assignedContainerId"); + String containerId = attemptAttempt.textValue(); + + // save task counters + saveHttpResponseQuietly(new File(destDir, "task_counters.json"), taskUrlBase + "/counters"); + + // save task logs + String logUrl = urlBase + "/jobhistory/logs/" + nodeId + "/" + containerId + "/" + succAttemptId + "/" + user + "/syslog/?start=0"; + logger.debug("Fetch task log from url: " + logUrl); + + saveHttpResponseQuietly(new File(destDir, "task_log.txt"), logUrl); } catch (Exception e) { logger.warn("Failed to get task counters rest response" + e); } } - private void extractJobConf(File exportDir, String jobUrlPrefix) throws IOException { + private String saveHttpResponseQuietly(File dest, String url) { + String response = null; + try { - String confUrl = jobUrlPrefix + "/conf/"; - String response = getHttpResponse(confUrl); - FileUtils.writeStringToFile(new File(exportDir, "job_conf.json"), response, Charset.defaultCharset()); + response = getHttpResponse(url); + FileUtils.forceMkdir(dest.getParentFile()); + FileUtils.writeStringToFile(dest, response, Charset.defaultCharset()); + return response; } catch (Exception e) { - logger.warn("Failed to get job conf rest response.", e); + logger.warn("Failed to get http response from {}.", url, e); } + + return response; } @Override protected void executeExtract(OptionsHelper optionsHelper, File exportDir) throws Exception { try { - boolean includeTaskCounter = optionsHelper.hasOption(OPTION_INCLUDE_COUNTERS) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_COUNTERS)) : true; + boolean includeTaskDetails = optionsHelper.hasOption(OPTION_INCLUDE_DETAILS) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_DETAILS)) : true; String mrJobId = optionsHelper.getOptionValue(OPTION_MR_JOB_ID); - String jobUrlPrefix = getRestCheckUrl() + "/ws/v1/history/mapreduce/jobs/" + mrJobId; + String jobUrlBase = getRestCheckUrl(); + String jobUrlPrefix = jobUrlBase + "/ws/v1/history/mapreduce/jobs/" + mrJobId; - if (includeTaskCounter) { - extractTaskCounters(exportDir, jobUrlPrefix); - } - extractJobCounters(exportDir, jobUrlPrefix); - extractJobTasks(exportDir, jobUrlPrefix); - extractJobConf(exportDir, jobUrlPrefix); - } catch (Exception e) { - logger.warn("Failed to get mr tasks rest response.", e); - } - } + // save mr job stats + String jobResponse = saveHttpResponseQuietly(new File(exportDir, "job.json"), jobUrlPrefix); + String user = new ObjectMapper().readTree(jobResponse).path("job").path("user").textValue(); - private void extractJobCounters(File exportDir, String jobUrlPrefix) { - String url = jobUrlPrefix + "/counters"; - String response = getHttpResponse(url); - try { - FileUtils.writeStringToFile(new File(exportDir, "job_counters.json"), response, Charset.defaultCharset()); - } catch (Exception e) { - logger.warn("Failed to get mr counters rest response.", e); - } - } + String jobAttemptResponse = saveHttpResponseQuietly(new File(exportDir, "job_attempts.json"), jobUrlPrefix + "/jobattempts"); + String nodeId = new ObjectMapper().readTree(jobAttemptResponse).path("jobAttempts").path("jobAttempt").get(0).path("nodeId").textValue(); + + // save mr job conf + saveHttpResponseQuietly(new File(exportDir, "job_conf.json"), jobUrlPrefix + "/conf"); + + // save mr job counters + saveHttpResponseQuietly(new File(exportDir, "job_counters.json"), jobUrlPrefix + "/counters"); + + // save task details + if (includeTaskDetails) { + extractTaskDetails(exportDir, jobUrlPrefix, jobUrlBase, nodeId, user); + } - private void extractJobTasks(File exportDir, String jobUrlPrefix) { - String url = jobUrlPrefix + "/tasks"; - String response = getHttpResponse(url); - try { - FileUtils.writeStringToFile(new File(exportDir, "job_tasks.json"), response, Charset.defaultCharset()); } catch (Exception e) { - logger.warn("Failed to get mr counters rest response.", e); + logger.warn("Failed to get mr tasks rest response.", e); } } - private void extractTaskCounters(File exportDir, String jobUrlPrefix) { + private void extractTaskDetails(File exportDir, String jobUrlPrefix, String jobUrlBase, String nodeId, String user) { try { String tasksUrl = jobUrlPrefix + "/tasks/"; - String tasksResponse = getHttpResponse(tasksUrl); + String tasksResponse = saveHttpResponseQuietly(new File(exportDir, "job_tasks.json"), tasksUrl); JsonNode tasks = new ObjectMapper().readTree(tasksResponse).path("tasks").path("task"); + // find the first start map and reduce + String firstStartMapId = null; + String firstStartReduceId = null; + long firstStartMapTime = Long.MAX_VALUE; + long firstStartReduceTime = Long.MAX_VALUE; + + // find the first end map and reduce + String firstEndMapId = null; + String firstEndReduceId = null; + long firstEndMapTime = Long.MAX_VALUE; + long firstEndReduceTime = Long.MAX_VALUE; + + // find the last start map and reduce + String lastStartMapId = null; + String lastStartReduceId = null; + long lastStartMapTime = 0L; + long lastStartReduceTime = 0L; + + // find the last end map and reduce + String lastEndMapId = null; + String lastEndReduceId = null; + long lastEndMapTime = 0L; + long lastEndReduceTime = 0L; + // find the max map and reduce duation String maxReduceId = null; String maxMapId = null; @@ -192,14 +238,10 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor { // find the min map and reduce duration String minReduceId = null; String minMapId = null; - long minMapElapsedTime = Integer.MAX_VALUE; - long minReduceElapsedTime = Integer.MAX_VALUE; - - // find a normal map and reduce duration (the first one) - String normReduceId = null; - String normMapId = null; - long normMapElapsedTime = 0; - long normReduceElapsedTime = 0; + long minMapElapsedTime = Long.MAX_VALUE; + long minReduceElapsedTime = Long.MAX_VALUE; + + Set<String> selectedTaskIds = Sets.newHashSet(); for (JsonNode node : tasks) { if (node.get("type").textValue().equals("MAP")) { if (node.get("elapsedTime").longValue() >= maxMapElapsedTime) { @@ -212,11 +254,27 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor { minMapId = node.get("id").textValue(); } - if (normMapElapsedTime == 0) { - normMapElapsedTime = node.get("elapsedTime").longValue(); - normMapId = node.get("id").textValue(); + if (node.get("startTime").longValue() <= firstStartMapTime) { + firstStartMapTime = node.get("startTime").longValue(); + firstStartMapId = node.get("id").textValue(); + } + + if (node.get("startTime").longValue() >= lastStartMapTime) { + lastStartMapTime = node.get("startTime").longValue(); + lastStartMapId = node.get("id").textValue(); + } + + if (node.get("finishTime").longValue() <= firstEndMapTime) { + firstEndMapTime = node.get("finishTime").longValue(); + firstEndMapId = node.get("id").textValue(); + } + + if (node.get("finishTime").longValue() >= lastEndMapTime) { + lastEndMapTime = node.get("finishTime").longValue(); + lastEndMapId = node.get("id").textValue(); } } + if (node.get("type").textValue().equals("REDUCE")) { if (node.get("elapsedTime").longValue() >= maxReduceElapsedTime) { maxReduceElapsedTime = node.get("elapsedTime").longValue(); @@ -228,20 +286,46 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor { minReduceId = node.get("id").textValue(); } - if (normReduceElapsedTime == 0) { - normReduceElapsedTime = node.get("elapsedTime").longValue(); - normReduceId = node.get("id").textValue(); + if (node.get("startTime").longValue() <= firstStartReduceTime) { + firstStartReduceTime = node.get("startTime").longValue(); + firstStartReduceId = node.get("id").textValue(); + } + + if (node.get("startTime").longValue() >= lastStartReduceTime) { + lastStartReduceTime = node.get("startTime").longValue(); + lastStartReduceId = node.get("id").textValue(); + } + + if (node.get("finishTime").longValue() <= firstEndReduceTime) { + firstEndReduceTime = node.get("finishTime").longValue(); + firstEndReduceId = node.get("id").textValue(); + } + + if (node.get("finishTime").longValue() >= lastEndReduceTime) { + lastEndReduceTime = node.get("finishTime").longValue(); + lastEndReduceId = node.get("id").textValue(); } } } - File counterDir = new File(exportDir, "counters"); - FileUtils.forceMkdir(counterDir); - extractTaskCounter(maxMapId, counterDir, tasksUrl, "max"); - extractTaskCounter(maxReduceId, counterDir, tasksUrl, "max"); - extractTaskCounter(minMapId, counterDir, tasksUrl, "min"); - extractTaskCounter(minReduceId, counterDir, tasksUrl, "min"); - extractTaskCounter(normMapId, counterDir, tasksUrl, "norm"); - extractTaskCounter(normReduceId, counterDir, tasksUrl, "norm"); + + selectedTaskIds.add(maxMapId); + selectedTaskIds.add(maxReduceId); + selectedTaskIds.add(minMapId); + selectedTaskIds.add(minReduceId); + selectedTaskIds.add(firstStartMapId); + selectedTaskIds.add(firstStartReduceId); + selectedTaskIds.add(lastStartMapId); + selectedTaskIds.add(lastStartReduceId); + selectedTaskIds.add(firstEndMapId); + selectedTaskIds.add(firstEndReduceId); + selectedTaskIds.add(lastEndMapId); + selectedTaskIds.add(lastEndReduceId); + + File tasksDir = new File(exportDir, "tasks"); + FileUtils.forceMkdir(tasksDir); + for (String taskId : selectedTaskIds) { + extractTaskDetail(taskId, nodeId, user, tasksDir, tasksUrl, jobUrlBase); + } } catch (Exception e) { logger.warn("Failed to get mr tasks rest response.", e); }
