Repository: kylin Updated Branches: refs/heads/yang21-cdh5.7 64b6e78f6 -> db16d74bd (forced update)
minor, extract job conf in diagnosis Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/22069740 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/22069740 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/22069740 Branch: refs/heads/yang21-cdh5.7 Commit: 2206974025bea132098122f58cfd76979313c725 Parents: 6cc7052 Author: lidongsjtu <lid...@apache.org> Authored: Tue Nov 1 21:58:02 2016 +0800 Committer: lidongsjtu <lid...@apache.org> Committed: Tue Nov 1 22:51:12 2016 +0800 ---------------------------------------------------------------------- .../kylin/source/hive/HiveCmdBuilder.java | 2 +- .../apache/kylin/tool/JobDiagnosisInfoCLI.java | 35 ++-- .../kylin/tool/JobTaskCounterExtractor.java | 155 --------------- .../apache/kylin/tool/MrJobInfoExtractor.java | 188 +++++++++++++++++++ 4 files changed, 207 insertions(+), 173 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/22069740/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdBuilder.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdBuilder.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdBuilder.java index 5a5b4e0..bce85b8 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdBuilder.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdBuilder.java @@ -80,7 +80,7 @@ public class HiveCmdBuilder { logger.info("The statements to execute in beeline: \n" + hqlBuf); if (logger.isDebugEnabled()) { - logger.debug("THe SQL to execute in beeline: \n" + IOUtils.toString(new FileReader(tmpHql))); + logger.debug("The SQL to execute in beeline: \n" + IOUtils.toString(new FileReader(tmpHql))); } } catch (IOException e) { throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/kylin/blob/22069740/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java ---------------------------------------------------------------------- diff --git a/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java b/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java index c8ff1f4..638d97b 100644 --- a/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java +++ b/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java @@ -87,7 +87,7 @@ public class JobDiagnosisInfoCLI extends AbstractInfoExtractor { @Override protected void executeExtract(OptionsHelper optionsHelper, File exportDir) throws Exception { - String jobId = optionsHelper.getOptionValue(OPTION_JOB_ID); + String kylinJobId = optionsHelper.getOptionValue(OPTION_JOB_ID); boolean includeCube = optionsHelper.hasOption(OPTION_INCLUDE_CUBE) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_CUBE)) : true; boolean includeYarnLogs = optionsHelper.hasOption(OPTION_INCLUDE_YARN_LOGS) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_YARN_LOGS)) : true; boolean includeClient = optionsHelper.hasOption(OPTION_INCLUDE_CLIENT) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_CLIENT)) : true; @@ -95,14 +95,14 @@ public class JobDiagnosisInfoCLI extends AbstractInfoExtractor { // dump job output logger.info("Start to dump job output"); - ExecutablePO executablePO = executableDao.getJob(jobId); - addRequired(ResourceStore.EXECUTE_RESOURCE_ROOT + "/" + jobId); - addRequired(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + jobId); - for (ExecutablePO task : executablePO.getTasks()) { - addRequired(ResourceStore.EXECUTE_RESOURCE_ROOT + "/" + task.getUuid()); - addRequired(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + task.getUuid()); + ExecutablePO executablePO = executableDao.getJob(kylinJobId); + addRequired(ResourceStore.EXECUTE_RESOURCE_ROOT + "/" + kylinJobId); + addRequired(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + kylinJobId); + for (ExecutablePO kylinTask : executablePO.getTasks()) { + addRequired(ResourceStore.EXECUTE_RESOURCE_ROOT + "/" + kylinTask.getUuid()); + addRequired(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + kylinTask.getUuid()); if (includeYarnLogs) { - yarnLogsResources.add(task.getUuid()); + yarnLogsResources.add(kylinTask.getUuid()); } } extractResources(exportDir); @@ -121,14 +121,14 @@ public class JobDiagnosisInfoCLI extends AbstractInfoExtractor { } } - // dump yarn logs + // dump mr job info if (includeYarnLogs) { - logger.info("Start to dump yarn job logs: " + jobId); - File yarnLogDir = new File(exportDir, "yarn"); - FileUtils.forceMkdir(yarnLogDir); + logger.info("Start to dump mr job info: " + kylinJobId); + File yarnDir = new File(exportDir, "yarn"); + FileUtils.forceMkdir(yarnDir); for (String stepId : yarnLogsResources) { - extractTaskCounter(stepId, new File(new File(yarnLogDir, stepId), "Counters")); - extractYarnLog(stepId, new File(yarnLogDir, stepId), true); + extractJobInfo(stepId, new File(yarnDir, stepId)); + extractJobLog(stepId, new File(yarnDir, stepId), true); } } @@ -171,7 +171,7 @@ public class JobDiagnosisInfoCLI extends AbstractInfoExtractor { } } - private void extractYarnLog(String taskId, File destDir, boolean onlyFail) throws Exception { + private void extractJobLog(String taskId, File destDir, boolean onlyFail) throws Exception { final Map<String, String> jobInfo = executableDao.getJobOutput(taskId).getInfo(); FileUtils.forceMkdir(destDir); if (jobInfo.containsKey(ExecutableConstants.MR_JOB_ID)) { @@ -189,12 +189,13 @@ public class JobDiagnosisInfoCLI extends AbstractInfoExtractor { } } - private void extractTaskCounter(String taskId, File destDir) throws Exception { + private void extractJobInfo(String taskId, File destDir) throws Exception { final Map<String, String> jobInfo = executableDao.getJobOutput(taskId).getInfo(); if (jobInfo.containsKey(ExecutableConstants.MR_JOB_ID)) { String jobId = jobInfo.get(ExecutableConstants.MR_JOB_ID); FileUtils.forceMkdir(destDir); - new JobTaskCounterExtractor(jobId).executeExtract(destDir); + String[] mrJobArgs = { "-destDir", destDir.getAbsolutePath(), "-compress", "false", "-submodule", "true" }; + new MrJobInfoExtractor(jobId).execute(mrJobArgs); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/22069740/tool/src/main/java/org/apache/kylin/tool/JobTaskCounterExtractor.java ---------------------------------------------------------------------- diff --git a/tool/src/main/java/org/apache/kylin/tool/JobTaskCounterExtractor.java b/tool/src/main/java/org/apache/kylin/tool/JobTaskCounterExtractor.java deleted file mode 100644 index 6a317e9..0000000 --- a/tool/src/main/java/org/apache/kylin/tool/JobTaskCounterExtractor.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.tool; - -import java.io.File; -import java.io.IOException; -import java.nio.charset.Charset; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.commons.httpclient.HttpClient; -import org.apache.commons.httpclient.HttpMethod; -import org.apache.commons.httpclient.methods.GetMethod; -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.conf.HAUtil; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.util.RMHAUtils; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.OptionsHelper; -import org.apache.kylin.engine.mr.HadoopUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; - -public class JobTaskCounterExtractor extends AbstractInfoExtractor { - private String mrJobId; - private String yarnUrl; - private static final Logger logger = LoggerFactory.getLogger(JobTaskCounterExtractor.class); - - private final int HTTP_RETRY = 3; - - public JobTaskCounterExtractor(String mrJobId) { - this.mrJobId = mrJobId; - this.yarnUrl = getRestCheckUrl(); - } - - private String getRestCheckUrl() { - KylinConfig config = KylinConfig.getInstanceFromEnv(); - final String yarnStatusCheckUrl = config.getYarnStatusCheckUrl(); - Pattern pattern = Pattern.compile("(http://)(.*):.*"); - if (yarnStatusCheckUrl != null) { - Matcher m = pattern.matcher(yarnStatusCheckUrl); - m.matches(); - yarnUrl = m.group(1) + m.group(2) + ":19888"; - return yarnUrl; - } else { - logger.info("kylin.job.yarn.app.rest.check.status.url" + " is not set read from hadoop configuration"); - } - Configuration conf = HadoopUtil.getCurrentConfiguration(); - String rmWebHost = HAUtil.getConfValueForRMInstance(YarnConfiguration.RM_WEBAPP_ADDRESS, YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, conf); - if (HAUtil.isHAEnabled(conf)) { - YarnConfiguration yarnConf = new YarnConfiguration(conf); - String active = RMHAUtils.findActiveRMHAId(yarnConf); - rmWebHost = HAUtil.getConfValueForRMInstance(HAUtil.addSuffix(YarnConfiguration.RM_WEBAPP_ADDRESS, active), YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, yarnConf); - } - if (StringUtils.isEmpty(rmWebHost)) { - return null; - } - if (rmWebHost.startsWith("http://") || rmWebHost.startsWith("https://")) { - //do nothing - } else { - rmWebHost = "http://" + rmWebHost; - } - Matcher m = pattern.matcher(rmWebHost); - m.matches(); - return m.group(1) + m.group(2) + ":19888"; - } - - private String getHttpResponse(String url) { - HttpClient client = new HttpClient(); - String response = null; - int retry_times = 0; - while (response == null && retry_times < HTTP_RETRY) { - retry_times++; - - HttpMethod get = new GetMethod(url); - try { - get.addRequestHeader("accept", "application/json"); - client.executeMethod(get); - response = get.getResponseBodyAsString(); - } catch (Exception e) { - logger.warn("Failed to fetch http response. Retry={}", retry_times, e); - } finally { - get.releaseConnection(); - } - } - return response; - } - - protected void executeExtract(File exportDir) { - try { - String taskUrl = yarnUrl + "/ws/v1/history/mapreduce/jobs/" + mrJobId + "/tasks/"; - String tasksResponse = getHttpResponse(taskUrl); - JsonNode tasks = new ObjectMapper().readTree(tasksResponse).path("tasks").path("task"); - - String maxReduceId = null; - String maxMapId = null; - long maxMapElapsedTime = 0L; - long maxReduceElapsedTime = 0L; - - for (JsonNode node : tasks) { - if (node.get("type").textValue().equals("MAP")) { - if (node.get("elapsedTime").longValue() >= maxMapElapsedTime) { - maxMapElapsedTime = node.get("elapsedTime").longValue(); - maxMapId = node.get("id").textValue(); - } - } - if (node.get("type").textValue().equals("REDUCE")) { - if (node.get("elapsedTime").longValue() >= maxReduceElapsedTime) { - maxReduceElapsedTime = node.get("elapsedTime").longValue(); - maxReduceId = node.get("id").textValue(); - } - } - } - extractTaskCounterFile(maxMapId, exportDir, taskUrl); - extractTaskCounterFile(maxReduceId, exportDir, taskUrl); - } catch (Exception e) { - logger.warn("Failed to get mr tasks rest response" + e); - } - } - - private void extractTaskCounterFile(String taskId, File exportDir, String taskUrl) throws IOException { - try { - String response = getHttpResponse(taskUrl + taskId + "/counters"); - FileUtils.writeStringToFile(new File(exportDir, taskId + ".json"), response, Charset.defaultCharset()); - } catch (Exception e) { - logger.warn("Failed to get task counters rest response" + e); - } - } - - @Override - protected void executeExtract(OptionsHelper optionsHelper, File exportDir) throws Exception { - executeExtract(exportDir); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/22069740/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java ---------------------------------------------------------------------- diff --git a/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java b/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java new file mode 100644 index 0000000..056c210 --- /dev/null +++ b/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.tool; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.httpclient.HttpClient; +import org.apache.commons.httpclient.HttpMethod; +import org.apache.commons.httpclient.methods.GetMethod; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.RMHAUtils; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.OptionsHelper; +import org.apache.kylin.engine.mr.HadoopUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +public class MrJobInfoExtractor extends AbstractInfoExtractor { + private String mrJobId; + private String jobUrlPrefix; + + private static final Logger logger = LoggerFactory.getLogger(MrJobInfoExtractor.class); + + @SuppressWarnings("static-access") + private static final Option OPTION_INCLUDE_COUNTERS = OptionBuilder.withArgName("includeCounters").hasArg().isRequired(false).withDescription("Specify whether to include mr task counters to extract. Default false.").create("includeCounters"); + + private final int HTTP_RETRY = 3; + + public MrJobInfoExtractor(String mrJobId) { + this.mrJobId = mrJobId; + String historyServerUrl = getRestCheckUrl(); + this.jobUrlPrefix = historyServerUrl + "/ws/v1/history/mapreduce/jobs/" + mrJobId; + } + + private String getRestCheckUrl() { + KylinConfig config = KylinConfig.getInstanceFromEnv(); + final String yarnStatusCheckUrl = config.getYarnStatusCheckUrl(); + Pattern pattern = Pattern.compile("(http://)(.*):.*"); + if (yarnStatusCheckUrl != null) { + Matcher m = pattern.matcher(yarnStatusCheckUrl); + if (m.matches()) { + return m.group(1) + m.group(2) + ":19888"; + } + } + logger.info("kylin.job.yarn.app.rest.check.status.url" + " is not set read from hadoop configuration"); + + Configuration conf = HadoopUtil.getCurrentConfiguration(); + String rmWebHost = HAUtil.getConfValueForRMInstance(YarnConfiguration.RM_WEBAPP_ADDRESS, YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, conf); + if (HAUtil.isHAEnabled(conf)) { + YarnConfiguration yarnConf = new YarnConfiguration(conf); + String active = RMHAUtils.findActiveRMHAId(yarnConf); + rmWebHost = HAUtil.getConfValueForRMInstance(HAUtil.addSuffix(YarnConfiguration.RM_WEBAPP_ADDRESS, active), YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, yarnConf); + } + if (StringUtils.isEmpty(rmWebHost)) { + return null; + } + if (rmWebHost.startsWith("http://") || rmWebHost.startsWith("https://")) { + //do nothing + } else { + rmWebHost = "http://" + rmWebHost; + } + Matcher m = pattern.matcher(rmWebHost); + m.matches(); + return m.group(1) + m.group(2) + ":19888"; + } + + private String getHttpResponse(String url) { + HttpClient client = new HttpClient(); + String response = null; + int retry_times = 0; + while (response == null && retry_times < HTTP_RETRY) { + retry_times++; + + HttpMethod get = new GetMethod(url); + try { + get.addRequestHeader("accept", "application/json"); + client.executeMethod(get); + response = get.getResponseBodyAsString(); + } catch (Exception e) { + logger.warn("Failed to fetch http response. Retry={}", retry_times, e); + } finally { + get.releaseConnection(); + } + } + return response; + } + + private void extractTaskCounter(String taskId, File exportDir, String taskUrl) throws IOException { + try { + String response = getHttpResponse(taskUrl + taskId + "/counters"); + FileUtils.writeStringToFile(new File(exportDir, taskId + ".json"), response, Charset.defaultCharset()); + } catch (Exception e) { + logger.warn("Failed to get task counters rest response" + e); + } + } + + private void extractJobConf(File exportDir) throws IOException { + try { + String jobResponse = getHttpResponse(jobUrlPrefix); + JsonNode job = new ObjectMapper().readTree(jobResponse).path("job").get("state"); + String state = job.textValue(); + logger.debug(state); + if (!state.equals("SUCCEEDED")) { + String confUrl = jobUrlPrefix + "/conf/"; + String response = getHttpResponse(confUrl); + FileUtils.writeStringToFile(new File(exportDir, "job_conf.json"), response, Charset.defaultCharset()); + } + } catch (Exception e) { + logger.warn("Failed to get job conf rest response.", e); + } + } + + @Override + protected void executeExtract(OptionsHelper optionsHelper, File exportDir) throws Exception { + try { + boolean includeTaskCounter = optionsHelper.hasOption(OPTION_INCLUDE_COUNTERS) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_COUNTERS)) : false; + if (includeTaskCounter) { + extractTaskCounters(exportDir); + } + extractJobConf(exportDir); + } catch (Exception e) { + logger.warn("Failed to get mr tasks rest response.", e); + } + } + + private void extractTaskCounters(File exportDir) { + try { + String tasksUrl = jobUrlPrefix + "/tasks/"; + String tasksResponse = getHttpResponse(tasksUrl); + JsonNode tasks = new ObjectMapper().readTree(tasksResponse).path("tasks").path("task"); + + String maxReduceId = null; + String maxMapId = null; + long maxMapElapsedTime = 0L; + long maxReduceElapsedTime = 0L; + + for (JsonNode node : tasks) { + if (node.get("type").textValue().equals("MAP")) { + if (node.get("elapsedTime").longValue() >= maxMapElapsedTime) { + maxMapElapsedTime = node.get("elapsedTime").longValue(); + maxMapId = node.get("id").textValue(); + } + } + if (node.get("type").textValue().equals("REDUCE")) { + if (node.get("elapsedTime").longValue() >= maxReduceElapsedTime) { + maxReduceElapsedTime = node.get("elapsedTime").longValue(); + maxReduceId = node.get("id").textValue(); + } + } + } + File counterDir = new File(exportDir, "counters"); + FileUtils.forceMkdir(counterDir); + extractTaskCounter(maxMapId, counterDir, tasksUrl); + extractTaskCounter(maxReduceId, counterDir, tasksUrl); + } catch (Exception e) { + logger.warn("Failed to get mr tasks rest response" + e); + } + } +}