KYLIN-1741 Diagnosis logs to kylin.log
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/8a34d3c3 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/8a34d3c3 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/8a34d3c3 Branch: refs/heads/1.5.x-CDH5.7 Commit: 8a34d3c31808a6070a24a465ddd306879c8e70fb Parents: 3ad49dd Author: lidongsjtu <lid...@apache.org> Authored: Tue Jul 12 15:03:41 2016 +0800 Committer: lidongsjtu <lid...@apache.org> Committed: Tue Jul 12 18:22:08 2016 +0800 ---------------------------------------------------------------------- build/bin/diag.sh | 23 ++- .../kylin/tool/AbstractInfoExtractor.java | 2 + .../org/apache/kylin/tool/DiagnosisInfoCLI.java | 14 +- .../apache/kylin/tool/JobInstanceExtractor.java | 182 +++++++++++++++++++ 4 files changed, 214 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/8a34d3c3/build/bin/diag.sh ---------------------------------------------------------------------- diff --git a/build/bin/diag.sh b/build/bin/diag.sh index 2ddfa80..22805f9 100644 --- a/build/bin/diag.sh +++ b/build/bin/diag.sh @@ -24,6 +24,9 @@ dir=$(dirname ${0}) export KYLIN_HOME=${dir}/../ source ${dir}/check-env.sh +tomcat_root=${dir}/../tomcat +export tomcat_root + if [ $# -eq 1 ] || [ $# -eq 2 ] then patient="$1" @@ -52,9 +55,9 @@ then diagJar=`ls ${KYLIN_HOME}/tool/kylin-tool-*.jar` if [ -f "${diagJar}" ]; then if [ -f "${KYLIN_HOME}/commit_SHA1" ]; then - export HBASE_CLASSPATH=${HBASE_CLASSPATH}:${diagJar}:${KYLIN_HOME}/lib/* + export HBASE_CLASSPATH=${HBASE_CLASSPATH}:${diagJar}:${KYLIN_HOME}/lib/*:${KYLIN_HOME}/conf else - export HBASE_CLASSPATH=${HBASE_CLASSPATH}:${KYLIN_HOME}/lib/*:${diagJar} + export HBASE_CLASSPATH=${HBASE_CLASSPATH}:${KYLIN_HOME}/lib/*:${diagJar}:${KYLIN_HOME}/conf fi else echo "missing diagnosis jar file." @@ -62,13 +65,23 @@ then fi if [ ${#patient} -eq 36 ]; then - exec hbase ${KYLIN_EXTRA_START_OPTS} -Dlog4j.configuration=kylin-log4j.properties org.apache.kylin.tool.JobDiagnosisInfoCLI -jobId $patient -destDir $destDir + hbase ${KYLIN_EXTRA_START_OPTS} \ + -Dlog4j.configuration=kylin-server-log4j.properties \ + -Dcatalina.home=${tomcat_root} \ + org.apache.kylin.tool.JobDiagnosisInfoCLI \ + -jobId $patient \ + -destDir $destDir else - exec hbase ${KYLIN_EXTRA_START_OPTS} -Dlog4j.configuration=kylin-log4j.properties org.apache.kylin.tool.DiagnosisInfoCLI -project -all -destDir $destDir + hbase ${KYLIN_EXTRA_START_OPTS} \ + -Dlog4j.configuration=kylin-server-log4j.properties \ + -Dcatalina.home=${tomcat_root} \ + org.apache.kylin.tool.DiagnosisInfoCLI \ + -project -all \ + -destDir $destDir fi exit 0 else echo "usage: diag.sh Project|JobId [target_path]" exit 1 -fi +fi \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/8a34d3c3/tool/src/main/java/org/apache/kylin/tool/AbstractInfoExtractor.java ---------------------------------------------------------------------- diff --git a/tool/src/main/java/org/apache/kylin/tool/AbstractInfoExtractor.java b/tool/src/main/java/org/apache/kylin/tool/AbstractInfoExtractor.java index a3bf24a..f2d6102 100644 --- a/tool/src/main/java/org/apache/kylin/tool/AbstractInfoExtractor.java +++ b/tool/src/main/java/org/apache/kylin/tool/AbstractInfoExtractor.java @@ -90,7 +90,9 @@ public abstract class AbstractInfoExtractor extends AbstractApplication { if (!isSubmodule && new File(exportDest).exists()) { exportDest = exportDest + packageName + "/"; } + exportDir = new File(exportDest); + FileUtils.forceMkdir(exportDir); if (!isSubmodule) { dumpBasicDiagInfo(); http://git-wip-us.apache.org/repos/asf/kylin/blob/8a34d3c3/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java ---------------------------------------------------------------------- diff --git a/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java b/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java index 495cc5b..cf563a5 100644 --- a/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java +++ b/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java @@ -53,7 +53,7 @@ public class DiagnosisInfoCLI extends AbstractInfoExtractor { private static final Option OPTION_INCLUDE_CLIENT = OptionBuilder.withArgName("includeClient").hasArg().isRequired(false).withDescription("Specify whether to include client info to extract. Default true.").create("includeClient"); @SuppressWarnings("static-access") - private static final Option OPTION_INCLUDE_JOB = OptionBuilder.withArgName("includeJobs").hasArg().isRequired(false).withDescription("Specify whether to include job output to extract. Default true.").create("includeJobs"); + private static final Option OPTION_INCLUDE_JOB = OptionBuilder.withArgName("includeJobs").hasArg().isRequired(false).withDescription("Specify whether to include job info to extract. Default true.").create("includeJobs"); public DiagnosisInfoCLI() { super(); @@ -82,6 +82,9 @@ public class DiagnosisInfoCLI extends AbstractInfoExtractor { } else { result.add(projectSeed); } + if (result.isEmpty()) { + throw new RuntimeException("No project to extract."); + } return result; } @@ -95,11 +98,18 @@ public class DiagnosisInfoCLI extends AbstractInfoExtractor { String projectNames = StringUtils.join(getProjects(projectInput), ","); // export cube metadata - String[] cubeMetaArgs = { "-destDir", new File(exportDir, "metadata").getAbsolutePath(), "-project", projectNames, "-compress", "false", "-includeJobs", Boolean.toString(includeJob), "-submodule", "true" }; + String[] cubeMetaArgs = { "-destDir", new File(exportDir, "metadata").getAbsolutePath(), "-project", projectNames, "-compress", "false", "-includeJobs", "false", "-submodule", "true" }; CubeMetaExtractor cubeMetaExtractor = new CubeMetaExtractor(); logger.info("CubeMetaExtractor args: " + Arrays.toString(cubeMetaArgs)); cubeMetaExtractor.execute(cubeMetaArgs); + // extract all job instances + if (includeJob) { + String[] jobArgs = { "-destDir", new File(exportDir, "jobs").getAbsolutePath(), "-compress", "false", "-submodule", "true" }; + JobInstanceExtractor jobInstanceExtractor = new JobInstanceExtractor(); + jobInstanceExtractor.execute(jobArgs); + } + // export HBase if (includeHBase) { String[] hbaseArgs = { "-destDir", new File(exportDir, "hbase").getAbsolutePath(), "-project", projectNames, "-compress", "false", "-submodule", "true" }; http://git-wip-us.apache.org/repos/asf/kylin/blob/8a34d3c3/tool/src/main/java/org/apache/kylin/tool/JobInstanceExtractor.java ---------------------------------------------------------------------- diff --git a/tool/src/main/java/org/apache/kylin/tool/JobInstanceExtractor.java b/tool/src/main/java/org/apache/kylin/tool/JobInstanceExtractor.java new file mode 100644 index 0000000..5ad4953 --- /dev/null +++ b/tool/src/main/java/org/apache/kylin/tool/JobInstanceExtractor.java @@ -0,0 +1,182 @@ +package org.apache.kylin.tool; + +import java.io.File; +import java.util.List; +import java.util.Map; + +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.OptionsHelper; +import org.apache.kylin.cube.model.CubeBuildTypeEnum; +import org.apache.kylin.engine.mr.CubingJob; +import org.apache.kylin.engine.mr.common.HadoopShellExecutable; +import org.apache.kylin.engine.mr.common.MapReduceExecutable; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; +import org.apache.kylin.job.JobInstance; +import org.apache.kylin.job.common.ShellExecutable; +import org.apache.kylin.job.constant.JobStatusEnum; +import org.apache.kylin.job.constant.JobStepStatusEnum; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.ExecutableState; +import org.apache.kylin.job.execution.Output; +import org.apache.kylin.job.manager.ExecutableManager; +import org.apache.kylin.metadata.project.ProjectInstance; +import org.apache.kylin.metadata.project.ProjectManager; +import org.apache.kylin.metadata.realization.RealizationType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +public class JobInstanceExtractor extends AbstractInfoExtractor { + private static final Logger logger = LoggerFactory.getLogger(JobInstanceExtractor.class); + + @SuppressWarnings("static-access") + private static final Option OPTION_PROJECT = OptionBuilder.withArgName("project").hasArg().isRequired(false).withDescription("Specify jobs in which project to extract").create("project"); + @SuppressWarnings("static-access") + private static final Option OPTION_CUBE = OptionBuilder.withArgName("cube").hasArg().isRequired(false).withDescription("Specify jobs related to which cube to extract").create("cube"); + + KylinConfig config; + ProjectManager projectManager; + ExecutableManager executableManager; + + public JobInstanceExtractor() { + config = KylinConfig.getInstanceFromEnv(); + executableManager = ExecutableManager.getInstance(config); + projectManager = ProjectManager.getInstance(config); + + packageType = "jobInstances"; + + options.addOption(OPTION_PROJECT); + options.addOption(OPTION_CUBE); + } + + @Override + protected void executeExtract(OptionsHelper optionsHelper, File exportDir) throws Exception { + String cube = optionsHelper.hasOption(OPTION_CUBE) ? optionsHelper.getOptionValue(OPTION_CUBE) : null; + String project = optionsHelper.hasOption(OPTION_PROJECT) ? optionsHelper.getOptionValue(OPTION_PROJECT) : null; + + long endTime = System.currentTimeMillis(); + long startTime = endTime - 3 * 24 * 3600 * 1000; + List<JobInstance> jobInstances = listJobInstances(cube, project, startTime, endTime); + logger.info("There are {} jobInstances to extract.", jobInstances.size()); + + ObjectMapper mapper = new ObjectMapper(); + for (JobInstance jobInstance : jobInstances) { + mapper.writeValue(new File(exportDir, jobInstance.getUuid() + ".json"), jobInstance); + } + } + + private List<JobInstance> listJobInstances(String project, String cube, long startTime, long endTime) { + final List<JobInstance> result = Lists.newArrayList(); + final List<AbstractExecutable> executables = executableManager.getAllExecutables(startTime, endTime); + final Map<String, Output> allOutputs = executableManager.getAllOutputs(); + for (AbstractExecutable executable : executables) { + if (executable instanceof CubingJob) { + String cubeName = CubingExecutableUtil.getCubeName(executable.getParams()); + boolean shouldExtract = false; + if (cube == null || cube.equalsIgnoreCase(cubeName)) { + if (project == null) { + shouldExtract = true; + } else { + ProjectInstance projectInstance = projectManager.getProject(project); + if (projectInstance != null && projectInstance.containsRealization(RealizationType.CUBE, cubeName)) { + shouldExtract = true; + } + } + } + + if (shouldExtract) { + result.add(parseToJobInstance((CubingJob) executable, allOutputs)); + } + } + } + return result; + } + + private JobInstance parseToJobInstance(CubingJob cubeJob, Map<String, Output> outputs) { + Output output = outputs.get(cubeJob.getId()); + final JobInstance result = new JobInstance(); + result.setName(cubeJob.getName()); + result.setRelatedCube(CubingExecutableUtil.getCubeName(cubeJob.getParams())); + result.setRelatedSegment(CubingExecutableUtil.getSegmentId(cubeJob.getParams())); + result.setLastModified(output.getLastModified()); + result.setSubmitter(cubeJob.getSubmitter()); + result.setUuid(cubeJob.getId()); + result.setType(CubeBuildTypeEnum.BUILD); + result.setStatus(parseToJobStatus(output.getState())); + result.setMrWaiting(AbstractExecutable.getExtraInfoAsLong(output, CubingJob.MAP_REDUCE_WAIT_TIME, 0L) / 1000); + result.setDuration(AbstractExecutable.getDuration(AbstractExecutable.getStartTime(output), AbstractExecutable.getEndTime(output)) / 1000); + for (int i = 0; i < cubeJob.getTasks().size(); ++i) { + AbstractExecutable task = cubeJob.getTasks().get(i); + result.addStep(parseToJobStep(task, i, outputs.get(task.getId()))); + } + return result; + } + + private JobStatusEnum parseToJobStatus(ExecutableState state) { + switch (state) { + case READY: + return JobStatusEnum.PENDING; + case RUNNING: + return JobStatusEnum.RUNNING; + case ERROR: + return JobStatusEnum.ERROR; + case DISCARDED: + return JobStatusEnum.DISCARDED; + case SUCCEED: + return JobStatusEnum.FINISHED; + case STOPPED: + default: + throw new RuntimeException("invalid state:" + state); + } + } + + private JobInstance.JobStep parseToJobStep(AbstractExecutable task, int i, Output stepOutput) { + Preconditions.checkNotNull(stepOutput); + JobInstance.JobStep result = new JobInstance.JobStep(); + result.setId(task.getId()); + result.setName(task.getName()); + result.setSequenceID(i); + result.setStatus(parseToJobStepStatus(stepOutput.getState())); + for (Map.Entry<String, String> entry : stepOutput.getExtra().entrySet()) { + if (entry.getKey() != null && entry.getValue() != null) { + result.putInfo(entry.getKey(), entry.getValue()); + } + } + result.setExecStartTime(AbstractExecutable.getStartTime(stepOutput)); + result.setExecEndTime(AbstractExecutable.getEndTime(stepOutput)); + if (task instanceof ShellExecutable) { + result.setExecCmd(((ShellExecutable) task).getCmd()); + } + if (task instanceof MapReduceExecutable) { + result.setExecCmd(((MapReduceExecutable) task).getMapReduceParams()); + result.setExecWaitTime(AbstractExecutable.getExtraInfoAsLong(stepOutput, MapReduceExecutable.MAP_REDUCE_WAIT_TIME, 0L) / 1000); + } + if (task instanceof HadoopShellExecutable) { + result.setExecCmd(((HadoopShellExecutable) task).getJobParams()); + } + return result; + } + + private JobStepStatusEnum parseToJobStepStatus(ExecutableState state) { + switch (state) { + case READY: + return JobStepStatusEnum.PENDING; + case RUNNING: + return JobStepStatusEnum.RUNNING; + case ERROR: + return JobStepStatusEnum.ERROR; + case DISCARDED: + return JobStepStatusEnum.DISCARDED; + case SUCCEED: + return JobStepStatusEnum.FINISHED; + case STOPPED: + default: + throw new RuntimeException("invalid state:" + state); + } + } +}