Repository: kylin Updated Branches: refs/heads/master 1e8b6a5e7 -> 9250d9baa
Minor, use util class to hold static job function Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/9e03c718 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/9e03c718 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/9e03c718 Branch: refs/heads/master Commit: 9e03c718f86725e7b5d0b96950d8372b7bb77428 Parents: b1cc0dd Author: Yifan Zhang <event.dim...@gmail.com> Authored: Mon Mar 27 10:50:12 2017 +0800 Committer: Dong Li <lid...@apache.org> Committed: Mon Mar 27 15:50:53 2017 +0800 ---------------------------------------------------------------------- .gitignore | 4 +- .../apache/kylin/rest/service/JobService.java | 103 +-------------- .../kylin/rest/util/JobInfoConverter.java | 130 +++++++++++++++++++ 3 files changed, 136 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/9e03c718/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index bb8b3b0..3679a13 100644 --- a/.gitignore +++ b/.gitignore @@ -14,7 +14,7 @@ release.properties #IDEA *.iml -#.settings +.settings # External tool builders .externalToolBuilders/ @@ -88,4 +88,4 @@ build/commit_SHA1 dist/ tomcat/ webapp/app/kylin-servlet.xml -webapp/app/web.xml \ No newline at end of file +webapp/app/web.xml http://git-wip-us.apache.org/repos/asf/kylin/blob/9e03c718/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java index 9836766..1e91b43 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -37,15 +37,11 @@ import org.apache.kylin.cube.CubeUpdate; import org.apache.kylin.cube.model.CubeBuildTypeEnum; import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.engine.mr.CubingJob; -import org.apache.kylin.engine.mr.common.HadoopShellExecutable; -import org.apache.kylin.engine.mr.common.MapReduceExecutable; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.job.JobInstance; import org.apache.kylin.job.Scheduler; import org.apache.kylin.job.SchedulerFactory; -import org.apache.kylin.job.common.ShellExecutable; import org.apache.kylin.job.constant.JobStatusEnum; -import org.apache.kylin.job.constant.JobStepStatusEnum; import org.apache.kylin.job.constant.JobTimeFilterEnum; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.exception.JobException; @@ -59,6 +55,7 @@ import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.realization.RealizationStatusEnum; import org.apache.kylin.rest.constant.Constant; import org.apache.kylin.rest.exception.BadRequestException; +import org.apache.kylin.rest.util.JobInfoConverter; import org.apache.kylin.source.ISource; import org.apache.kylin.source.SourceFactory; import org.apache.kylin.source.SourcePartition; @@ -275,108 +272,16 @@ public class JobService extends BasicService implements InitializingBean { result.setSubmitter(cubeJob.getSubmitter()); result.setUuid(cubeJob.getId()); result.setType(CubeBuildTypeEnum.BUILD); - result.setStatus(parseToJobStatus(job.getStatus())); + result.setStatus(JobInfoConverter.parseToJobStatus(job.getStatus())); result.setMrWaiting(cubeJob.getMapReduceWaitTime() / 1000); result.setDuration(cubeJob.getDuration() / 1000); for (int i = 0; i < cubeJob.getTasks().size(); ++i) { AbstractExecutable task = cubeJob.getTasks().get(i); - result.addStep(parseToJobStep(task, i, getExecutableManager().getOutput(task.getId()))); + result.addStep(JobInfoConverter.parseToJobStep(task, i, getExecutableManager().getOutput(task.getId()))); } return result; } - private JobInstance parseToJobInstance(AbstractExecutable job, Map<String, Output> outputs) { - if (job == null) { - return null; - } - Preconditions.checkState(job instanceof CubingJob, "illegal job type, id:" + job.getId()); - CubingJob cubeJob = (CubingJob) job; - Output output = outputs.get(job.getId()); - final JobInstance result = new JobInstance(); - result.setName(job.getName()); - result.setRelatedCube(CubingExecutableUtil.getCubeName(cubeJob.getParams())); - result.setRelatedSegment(CubingExecutableUtil.getSegmentId(cubeJob.getParams())); - result.setLastModified(output.getLastModified()); - result.setSubmitter(cubeJob.getSubmitter()); - result.setUuid(cubeJob.getId()); - result.setType(CubeBuildTypeEnum.BUILD); - result.setStatus(parseToJobStatus(output.getState())); - result.setMrWaiting(AbstractExecutable.getExtraInfoAsLong(output, CubingJob.MAP_REDUCE_WAIT_TIME, 0L) / 1000); - result.setExecStartTime(AbstractExecutable.getStartTime(output)); - result.setExecEndTime(AbstractExecutable.getEndTime(output)); - result.setDuration(AbstractExecutable.getDuration(result.getExecStartTime(), result.getExecEndTime()) / 1000); - for (int i = 0; i < cubeJob.getTasks().size(); ++i) { - AbstractExecutable task = cubeJob.getTasks().get(i); - result.addStep(parseToJobStep(task, i, outputs.get(task.getId()))); - } - return result; - } - - private JobInstance.JobStep parseToJobStep(AbstractExecutable task, int i, Output stepOutput) { - Preconditions.checkNotNull(stepOutput); - JobInstance.JobStep result = new JobInstance.JobStep(); - result.setId(task.getId()); - result.setName(task.getName()); - result.setSequenceID(i); - result.setStatus(parseToJobStepStatus(stepOutput.getState())); - for (Map.Entry<String, String> entry : stepOutput.getExtra().entrySet()) { - if (entry.getKey() != null && entry.getValue() != null) { - result.putInfo(entry.getKey(), entry.getValue()); - } - } - result.setExecStartTime(AbstractExecutable.getStartTime(stepOutput)); - result.setExecEndTime(AbstractExecutable.getEndTime(stepOutput)); - if (task instanceof ShellExecutable) { - result.setExecCmd(((ShellExecutable) task).getCmd()); - } - if (task instanceof MapReduceExecutable) { - result.setExecCmd(((MapReduceExecutable) task).getMapReduceParams()); - result.setExecWaitTime(AbstractExecutable.getExtraInfoAsLong(stepOutput, MapReduceExecutable.MAP_REDUCE_WAIT_TIME, 0L) / 1000); - } - if (task instanceof HadoopShellExecutable) { - result.setExecCmd(((HadoopShellExecutable) task).getJobParams()); - } - return result; - } - - private JobStatusEnum parseToJobStatus(ExecutableState state) { - switch (state) { - case READY: - return JobStatusEnum.PENDING; - case RUNNING: - return JobStatusEnum.RUNNING; - case ERROR: - return JobStatusEnum.ERROR; - case DISCARDED: - return JobStatusEnum.DISCARDED; - case SUCCEED: - return JobStatusEnum.FINISHED; - case STOPPED: - return JobStatusEnum.STOPPED; - default: - throw new RuntimeException("invalid state:" + state); - } - } - - private JobStepStatusEnum parseToJobStepStatus(ExecutableState state) { - switch (state) { - case READY: - return JobStepStatusEnum.PENDING; - case RUNNING: - return JobStepStatusEnum.RUNNING; - case ERROR: - return JobStepStatusEnum.ERROR; - case DISCARDED: - return JobStepStatusEnum.DISCARDED; - case SUCCEED: - return JobStepStatusEnum.FINISHED; - case STOPPED: - return JobStepStatusEnum.STOPPED; - default: - throw new RuntimeException("invalid state:" + state); - } - } - @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#job, 'ADMINISTRATION') or hasPermission(#job, 'OPERATION') or hasPermission(#job, 'MANAGEMENT')") public void resumeJob(JobInstance job) throws IOException, JobException { getExecutableManager().resumeJob(job.getId()); @@ -448,7 +353,7 @@ public class JobService extends BasicService implements InitializingBean { return Lists.newArrayList(FluentIterable.from(searchCubingJobs(cubeNameSubstring, projectName, states, timeStartInMillis, timeEndInMillis, allOutputs, false)).transform(new Function<CubingJob, JobInstance>() { @Override public JobInstance apply(CubingJob cubingJob) { - return parseToJobInstance(cubingJob, allOutputs); + return JobInfoConverter.parseToJobInstance(cubingJob, allOutputs); } })); } http://git-wip-us.apache.org/repos/asf/kylin/blob/9e03c718/server-base/src/main/java/org/apache/kylin/rest/util/JobInfoConverter.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/util/JobInfoConverter.java b/server-base/src/main/java/org/apache/kylin/rest/util/JobInfoConverter.java new file mode 100644 index 0000000..e13c9c3 --- /dev/null +++ b/server-base/src/main/java/org/apache/kylin/rest/util/JobInfoConverter.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.rest.util; + +import java.util.Map; + +import org.apache.kylin.cube.model.CubeBuildTypeEnum; +import org.apache.kylin.engine.mr.CubingJob; +import org.apache.kylin.engine.mr.common.HadoopShellExecutable; +import org.apache.kylin.engine.mr.common.MapReduceExecutable; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; +import org.apache.kylin.job.JobInstance; +import org.apache.kylin.job.common.ShellExecutable; +import org.apache.kylin.job.constant.JobStatusEnum; +import org.apache.kylin.job.constant.JobStepStatusEnum; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.ExecutableState; +import org.apache.kylin.job.execution.Output; + +import com.google.common.base.Preconditions; + +public class JobInfoConverter { + public static JobInstance parseToJobInstance(AbstractExecutable job, Map<String, Output> outputs) { + if (job == null) { + return null; + } + Preconditions.checkState(job instanceof CubingJob, "illegal job type, id:" + job.getId()); + CubingJob cubeJob = (CubingJob) job; + Output output = outputs.get(job.getId()); + final JobInstance result = new JobInstance(); + result.setName(job.getName()); + result.setRelatedCube(CubingExecutableUtil.getCubeName(cubeJob.getParams())); + result.setRelatedSegment(CubingExecutableUtil.getSegmentId(cubeJob.getParams())); + result.setLastModified(output.getLastModified()); + result.setSubmitter(cubeJob.getSubmitter()); + result.setUuid(cubeJob.getId()); + result.setType(CubeBuildTypeEnum.BUILD); + result.setStatus(parseToJobStatus(output.getState())); + result.setMrWaiting(AbstractExecutable.getExtraInfoAsLong(output, CubingJob.MAP_REDUCE_WAIT_TIME, 0L) / 1000); + result.setExecStartTime(AbstractExecutable.getStartTime(output)); + result.setExecEndTime(AbstractExecutable.getEndTime(output)); + result.setDuration(AbstractExecutable.getDuration(result.getExecStartTime(), result.getExecEndTime()) / 1000); + for (int i = 0; i < cubeJob.getTasks().size(); ++i) { + AbstractExecutable task = cubeJob.getTasks().get(i); + result.addStep(parseToJobStep(task, i, outputs.get(task.getId()))); + } + return result; + } + + public static JobInstance.JobStep parseToJobStep(AbstractExecutable task, int i, Output stepOutput) { + Preconditions.checkNotNull(stepOutput); + JobInstance.JobStep result = new JobInstance.JobStep(); + result.setId(task.getId()); + result.setName(task.getName()); + result.setSequenceID(i); + result.setStatus(parseToJobStepStatus(stepOutput.getState())); + for (Map.Entry<String, String> entry : stepOutput.getExtra().entrySet()) { + if (entry.getKey() != null && entry.getValue() != null) { + result.putInfo(entry.getKey(), entry.getValue()); + } + } + result.setExecStartTime(AbstractExecutable.getStartTime(stepOutput)); + result.setExecEndTime(AbstractExecutable.getEndTime(stepOutput)); + if (task instanceof ShellExecutable) { + result.setExecCmd(((ShellExecutable) task).getCmd()); + } + if (task instanceof MapReduceExecutable) { + result.setExecCmd(((MapReduceExecutable) task).getMapReduceParams()); + result.setExecWaitTime(AbstractExecutable.getExtraInfoAsLong(stepOutput, MapReduceExecutable.MAP_REDUCE_WAIT_TIME, 0L) / 1000); + } + if (task instanceof HadoopShellExecutable) { + result.setExecCmd(((HadoopShellExecutable) task).getJobParams()); + } + return result; + } + + public static JobStatusEnum parseToJobStatus(ExecutableState state) { + switch (state) { + case READY: + return JobStatusEnum.PENDING; + case RUNNING: + return JobStatusEnum.RUNNING; + case ERROR: + return JobStatusEnum.ERROR; + case DISCARDED: + return JobStatusEnum.DISCARDED; + case SUCCEED: + return JobStatusEnum.FINISHED; + case STOPPED: + return JobStatusEnum.STOPPED; + default: + throw new RuntimeException("invalid state:" + state); + } + } + + public static JobStepStatusEnum parseToJobStepStatus(ExecutableState state) { + switch (state) { + case READY: + return JobStepStatusEnum.PENDING; + case RUNNING: + return JobStepStatusEnum.RUNNING; + case ERROR: + return JobStepStatusEnum.ERROR; + case DISCARDED: + return JobStepStatusEnum.DISCARDED; + case SUCCEED: + return JobStepStatusEnum.FINISHED; + case STOPPED: + return JobStepStatusEnum.STOPPED; + default: + throw new RuntimeException("invalid state:" + state); + } + } +}