Repository: kylin Updated Branches: refs/heads/yang23 ecdcf3a14 -> 696f94a97
minor, job diag support different hadoop env Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/696f94a9 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/696f94a9 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/696f94a9 Branch: refs/heads/yang23 Commit: 696f94a9715d5c4be47ce3051aec4014f62ec36a Parents: ecdcf3a Author: lidongsjtu <lid...@apache.org> Authored: Thu Mar 16 16:48:02 2017 +0800 Committer: lidongsjtu <lid...@apache.org> Committed: Thu Mar 16 16:48:02 2017 +0800 ---------------------------------------------------------------------- .../apache/kylin/tool/MrJobInfoExtractor.java | 51 ++++++++++++++------ 1 file changed, 35 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/696f94a9/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java ---------------------------------------------------------------------- diff --git a/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java b/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java index b9bf2de..ca4c7e1 100644 --- a/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java +++ b/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java @@ -21,6 +21,7 @@ package org.apache.kylin.tool; import java.io.File; import java.io.IOException; import java.nio.charset.Charset; +import java.util.Map; import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -46,6 +47,7 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; public class MrJobInfoExtractor extends AbstractInfoExtractor { @@ -59,6 +61,11 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor { private static final int HTTP_RETRY = 3; + private Map<String, String> nodeInfoMap = Maps.newHashMap(); + + private String jobHistoryUrlBase; + private String yarnMasterUrlBase; + public MrJobInfoExtractor() { packageType = "MR"; @@ -71,14 +78,15 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor { extractor.execute(args); } - private String getRestCheckUrl() { + private void extractRestCheckUrl() { KylinConfig config = KylinConfig.getInstanceFromEnv(); final String yarnStatusCheckUrl = config.getYarnStatusCheckUrl(); - Pattern pattern = Pattern.compile("(http://)(.*):.*"); + Pattern pattern = Pattern.compile("(http://)([^:]*):([^/])*.*"); if (yarnStatusCheckUrl != null) { Matcher m = pattern.matcher(yarnStatusCheckUrl); if (m.matches()) { - return m.group(1) + m.group(2) + ":19888"; + jobHistoryUrlBase = m.group(1) + m.group(2) + ":19888"; + yarnMasterUrlBase = m.group(1) + m.group(2) + ":" + m.group(3); } } logger.info("kylin.engine.mr.yarn-check-status-url" + " is not set, read from hadoop configuration"); @@ -91,14 +99,15 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor { rmWebHost = HAUtil.getConfValueForRMInstance(HAUtil.addSuffix(YarnConfiguration.RM_WEBAPP_ADDRESS, active), YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, yarnConf); } if (StringUtils.isEmpty(rmWebHost)) { - return null; + return; } if (!rmWebHost.startsWith("http://") && !rmWebHost.startsWith("https://")) { rmWebHost = "http://" + rmWebHost; } Matcher m = pattern.matcher(rmWebHost); Preconditions.checkArgument(m.matches(), "Yarn master URL not found."); - return m.group(1) + m.group(2) + ":19888"; + yarnMasterUrlBase = rmWebHost; + jobHistoryUrlBase = m.group(1) + HAUtil.getConfValueForRMInstance("mapreduce.jobhistory.webapp.address", m.group(2) + ":19888", conf); } private String getHttpResponse(String url) { @@ -122,7 +131,7 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor { return msg; } - private void extractTaskDetail(String taskId, String nodeId, String user, File exportDir, String taskUrl, String urlBase) throws IOException { + private void extractTaskDetail(String taskId, String user, File exportDir, String taskUrl, String urlBase) throws IOException { try { if (StringUtils.isEmpty(taskId)) { return; @@ -137,8 +146,9 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor { String succAttemptId = taskAttempt.textValue(); String attemptInfo = saveHttpResponseQuietly(new File(destDir, "task_attempts.json"), taskUrlBase + "/attempts/" + succAttemptId); - JsonNode attemptAttempt = new ObjectMapper().readTree(attemptInfo).path("taskAttempt").path("assignedContainerId"); - String containerId = attemptAttempt.textValue(); + JsonNode attemptAttempt = new ObjectMapper().readTree(attemptInfo).path("taskAttempt"); + String containerId = attemptAttempt.get("assignedContainerId").textValue(); + String nodeId = nodeInfoMap.get(attemptAttempt.get("nodeHttpAddress").textValue()); // save task counters saveHttpResponseQuietly(new File(destDir, "task_counters.json"), taskUrlBase + "/counters"); @@ -173,16 +183,25 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor { try { boolean includeTaskDetails = optionsHelper.hasOption(OPTION_INCLUDE_DETAILS) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_DETAILS)) : true; String mrJobId = optionsHelper.getOptionValue(OPTION_MR_JOB_ID); - String jobUrlBase = getRestCheckUrl(); - String jobUrlPrefix = jobUrlBase + "/ws/v1/history/mapreduce/jobs/" + mrJobId; + extractRestCheckUrl(); + + Preconditions.checkNotNull(jobHistoryUrlBase); + Preconditions.checkNotNull(yarnMasterUrlBase); + + String jobUrlPrefix = jobHistoryUrlBase + "/ws/v1/history/mapreduce/jobs/" + mrJobId; + + // cache node info + String nodeUrl = yarnMasterUrlBase + "/ws/v1/cluster/nodes"; + String nodeResponse = getHttpResponse(nodeUrl); + JsonNode nodes = new ObjectMapper().readTree(nodeResponse).path("nodes").path("node"); + for (JsonNode node : nodes) { + nodeInfoMap.put(node.path("nodeHTTPAddress").textValue(), node.path("id").textValue()); + } // save mr job stats String jobResponse = saveHttpResponseQuietly(new File(exportDir, "job.json"), jobUrlPrefix); String user = new ObjectMapper().readTree(jobResponse).path("job").path("user").textValue(); - String jobAttemptResponse = saveHttpResponseQuietly(new File(exportDir, "job_attempts.json"), jobUrlPrefix + "/jobattempts"); - String nodeId = new ObjectMapper().readTree(jobAttemptResponse).path("jobAttempts").path("jobAttempt").get(0).path("nodeId").textValue(); - // save mr job conf saveHttpResponseQuietly(new File(exportDir, "job_conf.json"), jobUrlPrefix + "/conf"); @@ -191,7 +210,7 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor { // save task details if (includeTaskDetails) { - extractTaskDetails(exportDir, jobUrlPrefix, jobUrlBase, nodeId, user); + extractTaskDetails(exportDir, jobUrlPrefix, jobHistoryUrlBase, user); } } catch (Exception e) { @@ -199,7 +218,7 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor { } } - private void extractTaskDetails(File exportDir, String jobUrlPrefix, String jobUrlBase, String nodeId, String user) { + private void extractTaskDetails(File exportDir, String jobUrlPrefix, String jobUrlBase, String user) { try { String tasksUrl = jobUrlPrefix + "/tasks/"; String tasksResponse = saveHttpResponseQuietly(new File(exportDir, "job_tasks.json"), tasksUrl); @@ -324,7 +343,7 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor { File tasksDir = new File(exportDir, "tasks"); FileUtils.forceMkdir(tasksDir); for (String taskId : selectedTaskIds) { - extractTaskDetail(taskId, nodeId, user, tasksDir, tasksUrl, jobUrlBase); + extractTaskDetail(taskId, user, tasksDir, tasksUrl, jobUrlBase); } } catch (Exception e) { logger.warn("Failed to get mr tasks rest response.", e);