Repository: kylin Updated Branches: refs/heads/master 53b5a6d8e -> 798f03ed2
KYLIN-227 Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/798f03ed Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/798f03ed Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/798f03ed Branch: refs/heads/master Commit: 798f03ed2eaaddaee5930ff074a022655fec51ff Parents: 53b5a6d Author: shaofengshi <shaofeng...@apache.org> Authored: Fri Nov 11 16:11:55 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Thu Nov 24 09:11:33 2016 +0800 ---------------------------------------------------------------------- .../kylin/job/constant/JobStatusEnum.java | 2 +- .../kylin/job/constant/JobStepStatusEnum.java | 2 +- .../kylin/job/execution/AbstractExecutable.java | 5 +++ .../job/execution/DefaultChainedExecutable.java | 6 ++++ .../kylin/job/execution/ExecutableManager.java | 11 +++++- .../kylin/job/execution/ExecutableState.java | 7 ++++ .../job/impl/threadpool/DefaultScheduler.java | 6 ++-- .../apache/kylin/job/ExecutableManagerTest.java | 2 +- .../kylin/rest/controller/JobController.java | 22 ++++++++++++ .../apache/kylin/rest/service/CubeService.java | 2 +- .../apache/kylin/rest/service/JobService.java | 15 ++++++++ webapp/app/js/controllers/job.js | 38 +++++++++++++++++++- webapp/app/js/model/jobConfig.js | 1 + webapp/app/js/services/jobs.js | 3 +- webapp/app/partials/jobs/jobList.html | 10 +++++- 15 files changed, 122 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/798f03ed/core-job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java b/core-job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java index a4ef564..4c6ac97 100644 --- a/core-job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java +++ b/core-job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java @@ -20,7 +20,7 @@ package org.apache.kylin.job.constant; public enum JobStatusEnum { - NEW(0), PENDING(1), RUNNING(2), FINISHED(4), ERROR(8), DISCARDED(16); + NEW(0), PENDING(1), RUNNING(2), FINISHED(4), ERROR(8), DISCARDED(16), STOPPED(32); private final int code; http://git-wip-us.apache.org/repos/asf/kylin/blob/798f03ed/core-job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java b/core-job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java index 08ee79a..08cd138 100644 --- a/core-job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java +++ b/core-job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java @@ -19,7 +19,7 @@ package org.apache.kylin.job.constant; public enum JobStepStatusEnum { - NEW(0), PENDING(1), RUNNING(2), FINISHED(4), ERROR(8), DISCARDED(16), WAITING(32), KILLED(64); + NEW(0), PENDING(1), RUNNING(2), FINISHED(4), ERROR(8), DISCARDED(16), WAITING(32), KILLED(64), STOPPED(128); private final int code; http://git-wip-us.apache.org/repos/asf/kylin/blob/798f03ed/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java index 80a92de..551241b 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java @@ -380,6 +380,11 @@ public abstract class AbstractExecutable implements Executable, Idempotent { return status == ExecutableState.DISCARDED; } + protected final boolean isPaused() { + final ExecutableState status = getOutput().getState(); + return status == ExecutableState.STOPPED; + } + protected boolean needRetry() { return this.retry <= config.getJobRetry(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/798f03ed/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java index fccab30..253072e 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java @@ -54,6 +54,9 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai if (state == ExecutableState.RUNNING) { // there is already running subtask, no need to start a new subtask break; + } else if (state == ExecutableState.STOPPED) { + // the job is paused + break; } else if (state == ExecutableState.ERROR) { throw new IllegalStateException("invalid subtask state, subtask:" + subTask.getName() + ", state:" + subTask.getStatus()); } @@ -89,6 +92,9 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai if (isDiscarded()) { setEndTime(System.currentTimeMillis()); notifyUserStatusChange(executableContext, ExecutableState.DISCARDED); + } else if (isPaused()) { + setEndTime(System.currentTimeMillis()); + notifyUserStatusChange(executableContext, ExecutableState.STOPPED); } else if (result.succeed()) { List<? extends Executable> jobs = getTasks(); boolean allSucceed = true; http://git-wip-us.apache.org/repos/asf/kylin/blob/798f03ed/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java index 0e39ad1..52d4d1c 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java @@ -266,7 +266,7 @@ public class ExecutableManager { if (job instanceof DefaultChainedExecutable) { List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks(); for (AbstractExecutable task : tasks) { - if (task.getStatus() == ExecutableState.ERROR) { + if (task.getStatus() == ExecutableState.ERROR || task.getStatus() == ExecutableState.STOPPED) { updateJobOutput(task.getId(), ExecutableState.READY, null, null); break; } @@ -292,6 +292,15 @@ public class ExecutableManager { updateJobOutput(jobId, ExecutableState.DISCARDED, null, null); } + public void pauseJob(String jobId) { + AbstractExecutable job = getJob(jobId); + if (job == null) { + return; + } + + updateJobOutput(jobId, ExecutableState.STOPPED, null, null); + } + public void updateJobOutput(String jobId, ExecutableState newStatus, Map<String, String> info, String output) { try { final ExecutableOutputPO jobOutput = executableDao.getJobOutput(jobId); http://git-wip-us.apache.org/repos/asf/kylin/blob/798f03ed/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java index a2f0a69..0684eff 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java @@ -63,6 +63,13 @@ public enum ExecutableState { VALID_STATE_TRANSFER.put(ExecutableState.ERROR, ExecutableState.DISCARDED); VALID_STATE_TRANSFER.put(ExecutableState.ERROR, ExecutableState.READY); + + + VALID_STATE_TRANSFER.put(ExecutableState.READY, ExecutableState.STOPPED); + VALID_STATE_TRANSFER.put(ExecutableState.RUNNING, ExecutableState.STOPPED); + + + } public boolean isFinalState() { http://git-wip-us.apache.org/repos/asf/kylin/blob/798f03ed/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java index 9d5f7ba..be8e7fe 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java @@ -81,7 +81,7 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti return; } - int nRunning = 0, nReady = 0, nOthers = 0, nError = 0, nDiscarded = 0, nSUCCEED = 0; + int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 0, nDiscarded = 0, nSUCCEED = 0; for (final String id : executableManager.getAllJobIds()) { if (runningJobs.containsKey(id)) { // logger.debug("Job id:" + id + " is already running"); @@ -97,6 +97,8 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti nError++; } else if (output.getState() == ExecutableState.SUCCEED) { nSUCCEED++; + } else if (output.getState() == ExecutableState.STOPPED) { + nStopped++; } else { nOthers++; } @@ -115,7 +117,7 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti logger.warn(jobDesc + " fail to schedule", ex); } } - logger.info("Job Fetcher: " + nRunning + " should running, " + runningJobs.size() + " actual running, " + nReady + " ready, " + nSUCCEED + " already succeed, " + nError + " error, " + nDiscarded + " discarded, " + nOthers + " others"); + logger.info("Job Fetcher: " + nRunning + " should running, " + runningJobs.size() + " actual running, " + nStopped + " stopped, " + nReady + " ready, " + nSUCCEED + " already succeed, " + nError + " error, " + nDiscarded + " discarded, " + nOthers + " others"); } catch (Exception e) { logger.warn("Job Fetcher caught a exception " + e); } http://git-wip-us.apache.org/repos/asf/kylin/blob/798f03ed/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java ---------------------------------------------------------------------- diff --git a/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java b/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java index 2868f08..faea9a4 100644 --- a/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java +++ b/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java @@ -109,7 +109,7 @@ public class ExecutableManagerTest extends LocalFileMetadataTestCase { public void testInvalidStateTransfer() { SucceedTestExecutable job = new SucceedTestExecutable(); service.addJob(job); - service.updateJobOutput(job.getId(), ExecutableState.RUNNING, null, null); + service.updateJobOutput(job.getId(), ExecutableState.ERROR, null, null); service.updateJobOutput(job.getId(), ExecutableState.STOPPED, null, null); } http://git-wip-us.apache.org/repos/asf/kylin/blob/798f03ed/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java index 16b643c..1af4394 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java @@ -153,6 +153,28 @@ public class JobController extends BasicController { } + + + /** + * Pause a job + * + * @return + * @throws IOException + */ + @RequestMapping(value = "/{jobId}/pause", method = { RequestMethod.PUT }) + @ResponseBody + public JobInstance pause(@PathVariable String jobId) { + + try { + final JobInstance jobInstance = jobService.getJobInstance(jobId); + return jobService.pauseJob(jobInstance); + } catch (Exception e) { + logger.error(e.getLocalizedMessage(), e); + throw new InternalErrorException(e); + } + + } + public void setJobService(JobService jobService) { this.jobService = jobService; } http://git-wip-us.apache.org/repos/asf/kylin/blob/798f03ed/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java index f0371d1..5c59e1a 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java @@ -534,7 +534,7 @@ public class CubeService extends BasicService { final List<CubingJob> cubingJobs = jobService.listAllCubingJobs(cube.getName(), null); for (CubingJob cubingJob : cubingJobs) { final ExecutableState status = cubingJob.getStatus(); - if (status != ExecutableState.SUCCEED && status != ExecutableState.STOPPED && status != ExecutableState.DISCARDED) { + if (status != ExecutableState.SUCCEED && status != ExecutableState.DISCARDED) { getExecutableManager().discardJob(cubingJob.getId()); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/798f03ed/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java index fa578fe..19902f0 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -257,6 +257,8 @@ public class JobService extends BasicService implements InitializingBean { return ExecutableState.READY; case RUNNING: return ExecutableState.RUNNING; + case STOPPED: + return ExecutableState.STOPPED; default: throw new RuntimeException("illegal status:" + status); } @@ -425,6 +427,7 @@ public class JobService extends BasicService implements InitializingBean { case SUCCEED: return JobStatusEnum.FINISHED; case STOPPED: + return JobStatusEnum.STOPPED; default: throw new RuntimeException("invalid state:" + state); } @@ -443,6 +446,7 @@ public class JobService extends BasicService implements InitializingBean { case SUCCEED: return JobStepStatusEnum.FINISHED; case STOPPED: + return JobStepStatusEnum.STOPPED; default: throw new RuntimeException("invalid state:" + state); } @@ -481,6 +485,17 @@ public class JobService extends BasicService implements InitializingBean { return job; } + + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#job, 'ADMINISTRATION') or hasPermission(#job, 'OPERATION') or hasPermission(#job, 'MANAGEMENT')") + public JobInstance pauseJob(JobInstance job) throws IOException, JobException { + getExecutableManager().pauseJob(job.getId()); + + //release the segment lock when discarded the job but the job hasn't scheduled + releaseSegmentLock(job.getRelatedSegment()); + + return job; + } + private void lockSegment(String segmentId) throws JobException { if (jobLock instanceof DistributedJobLock) { if (!((DistributedJobLock) jobLock).lockWithName(segmentId, getServerName())) { http://git-wip-us.apache.org/repos/asf/kylin/blob/798f03ed/webapp/app/js/controllers/job.js ---------------------------------------------------------------------- diff --git a/webapp/app/js/controllers/job.js b/webapp/app/js/controllers/job.js index 05c1a87..e6aba7e 100644 --- a/webapp/app/js/controllers/job.js +++ b/webapp/app/js/controllers/job.js @@ -69,7 +69,7 @@ KylinApp }); $scope.cubeName=$scope.cubeName == ""?null:$scope.cubeName; - + var jobRequest = { cubeName: $scope.cubeName, projectName: $scope.state.projectName, @@ -179,6 +179,42 @@ KylinApp }); } + $scope.pause = function (job) { + SweetAlert.swal({ + title: '', + text: 'Are you sure to pause the job?', + type: '', + showCancelButton: true, + confirmButtonColor: '#DD6B55', + confirmButtonText: "Yes", + closeOnConfirm: true + }, function(isConfirm) { + if(isConfirm) { + loadingRequest.show(); + JobService.pause({jobId: job.uuid}, {}, function (job) { + loadingRequest.hide(); + $scope.safeApply(function() { + JobList.jobs[job.uuid] = job; + if (angular.isDefined($scope.state.selectedJob)) { + $scope.state.selectedJob = JobList.jobs[ $scope.state.selectedJob.uuid]; + } + + }); + SweetAlert.swal('Success!', 'Job has been paused successfully!', 'success'); + },function(e){ + loadingRequest.hide(); + if(e.data&& e.data.exception){ + var message =e.data.exception; + var msg = !!(message) ? message : 'Failed to take action.'; + SweetAlert.swal('Oops...', msg, 'error'); + }else{ + SweetAlert.swal('Oops...', "Failed to take action.", 'error'); + } + }); + } + }); + } + $scope.diagnosisJob =function(job) { if (!job){ SweetAlert.swal('', "No job selected.", 'info'); http://git-wip-us.apache.org/repos/asf/kylin/blob/798f03ed/webapp/app/js/model/jobConfig.js ---------------------------------------------------------------------- diff --git a/webapp/app/js/model/jobConfig.js b/webapp/app/js/model/jobConfig.js index f4b918d..4548366 100644 --- a/webapp/app/js/model/jobConfig.js +++ b/webapp/app/js/model/jobConfig.js @@ -21,6 +21,7 @@ KylinApp.constant('jobConfig', { {name: 'NEW', value: 0}, {name: 'PENDING', value: 1}, {name: 'RUNNING', value: 2}, + {name: 'STOPPED', value: 32}, {name: 'FINISHED', value: 4}, {name: 'ERROR', value: 8}, {name: 'DISCARDED', value: 16} http://git-wip-us.apache.org/repos/asf/kylin/blob/798f03ed/webapp/app/js/services/jobs.js ---------------------------------------------------------------------- diff --git a/webapp/app/js/services/jobs.js b/webapp/app/js/services/jobs.js index c0f297b..3619fbc 100644 --- a/webapp/app/js/services/jobs.js +++ b/webapp/app/js/services/jobs.js @@ -22,6 +22,7 @@ KylinApp.factory('JobService', ['$resource', function ($resource, config) { get: {method: 'GET', params: {}, isArray: false}, stepOutput: {method: 'GET', params: {propName: 'steps', action: 'output'}, isArray: false}, resume: {method: 'PUT', params: {action: 'resume'}, isArray: false}, - cancel: {method: 'PUT', params: {action: 'cancel'}, isArray: false} + cancel: {method: 'PUT', params: {action: 'cancel'}, isArray: false}, + pause: {method: 'PUT', params: {action: 'pause'}, isArray: false} }); }]); http://git-wip-us.apache.org/repos/asf/kylin/blob/798f03ed/webapp/app/partials/jobs/jobList.html ---------------------------------------------------------------------- diff --git a/webapp/app/partials/jobs/jobList.html b/webapp/app/partials/jobs/jobList.html index 98fb25d..d286022 100644 --- a/webapp/app/partials/jobs/jobList.html +++ b/webapp/app/partials/jobs/jobList.html @@ -114,6 +114,11 @@ type="info">{{job.progress | number:2}}% </progressbar> </div> + <div ng-switch-when="STOPPED" tooltip="STOPPED"> + <progressbar class="progress-striped" value="job.progress" + type="pending">{{job.progress | number:2}}% + </progressbar> + </div> <div ng-switch-when="DISCARDED" tooltip="DISCARDED"> <progressbar class="progress-striped" value="job.progress" type="inverse"> {{job.progress | number:2}}% @@ -130,7 +135,10 @@ Action <span class="ace-icon fa fa-caret-down icon-on-right"></span> </button> <ul class="dropdown-menu" role="menu"> - <li ng-if="job.job_status=='ERROR'"><a ng-click="resume(job)">Resume</a></li> + <li ng-if="job.job_status=='ERROR' || job.job_status=='STOPPED'"><a ng-click="resume(job)">Resume</a></li> + <li ng-if="job.job_status=='RUNNING' || job.job_status=='NEW' || job.job_status=='PENDING'"> + <a ng-click="pause(job)">Pause</a> + </li> <li ng-if="job.job_status=='RUNNING' || job.job_status=='NEW' || job.job_status=='PENDING' || job.job_status=='ERROR'"> <a ng-click="cancel(job)">Discard</a> </li>