This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new 7ac5a84 KYLIN-3892 Set cubing job priority 7ac5a84 is described below commit 7ac5a8449621c191b9e1bcdeaa61a31e144e24a3 Author: Temple Zhou <dba...@gmail.com> AuthorDate: Tue Apr 9 17:31:44 2019 +0800 KYLIN-3892 Set cubing job priority --- .../org/apache/kylin/engine/EngineFactory.java | 6 ++++- .../apache/kylin/engine/IBatchCubingEngine.java | 2 +- .../org/apache/kylin/job/dao/ExecutablePO.java | 11 +++++++++ .../kylin/job/execution/AbstractExecutable.java | 16 +++++++++++++ .../kylin/job/execution/ExecutableManager.java | 2 ++ .../job/impl/threadpool/DefaultFetcherRunner.java | 25 +++++++------------ .../kylin/job/impl/threadpool/FetcherRunner.java | 24 +++++++++++++++++++ .../job/impl/threadpool/PriorityFetcherRunner.java | 28 +++++++--------------- .../kylin/engine/mr/BatchCubingJobBuilder2.java | 8 +++++-- .../apache/kylin/engine/mr/JobBuilderSupport.java | 6 +++++ .../kylin/engine/mr/MRBatchCubingEngine2.java | 4 ++-- .../engine/spark/SparkBatchCubingEngine2.java | 4 ++-- .../engine/spark/SparkBatchCubingJobBuilder2.java | 10 +++++++- .../kylin/rest/controller/CubeController.java | 8 +++---- .../apache/kylin/rest/request/JobBuildRequest.java | 10 ++++++++ .../kylin/rest/request/JobBuildRequest2.java | 9 +++++++ .../org/apache/kylin/rest/service/JobService.java | 10 ++++---- .../org/apache/kylin/tool/job/CubeBuildingCLI.java | 4 ++-- 18 files changed, 131 insertions(+), 56 deletions(-) diff --git a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java index 734c470..63eb09a 100644 --- a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java +++ b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java @@ -54,7 +54,11 @@ public class EngineFactory { /** Build a new cube segment, typically its time range appends to the end of current cube. */ public static DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) { - return batchEngine(newSegment).createBatchCubingJob(newSegment, submitter); + return createBatchCubingJob(newSegment, submitter, 0); + } + + public static DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter, Integer priorityOffset) { + return batchEngine(newSegment).createBatchCubingJob(newSegment, submitter, priorityOffset); } /** Merge multiple small segments into a big one. */ diff --git a/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java b/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java index a618eac..d259a0e 100644 --- a/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java +++ b/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java @@ -32,7 +32,7 @@ public interface IBatchCubingEngine { public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeSegment newSegment); /** Build a new cube segment, typically its time range appends to the end of current cube. */ - public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter); + public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter, Integer priorityOffset); /** Merge multiple small segments into a big one. */ public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter); diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java index f48c876..1238b66 100644 --- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java +++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java @@ -48,6 +48,9 @@ public class ExecutablePO extends RootPersistentEntity { @JsonProperty("params") private Map<String, String> params = Maps.newHashMap(); + @JsonProperty("priority") + private Integer priority; + public String getName() { return name; } @@ -88,4 +91,12 @@ public class ExecutablePO extends RootPersistentEntity { this.params = params; } + public Integer getPriority() { + return priority; + } + + public void setPriority(Integer priority) { + this.priority = priority; + } + } diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java index 8e1b262..51d9bf7 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java @@ -70,6 +70,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent { private String id; private AbstractExecutable parentExecutable = null; private Map<String, String> params = Maps.newHashMap(); + protected Integer priority; public AbstractExecutable() { setId(RandomUtil.randomUUID().toString()); @@ -491,6 +492,21 @@ public abstract class AbstractExecutable implements Executable, Idempotent { return DEFAULT_PRIORITY; } + public Integer getPriority() { + return priority == null ? getDefaultPriority() : priority; + } + + public void setPriority(Integer priority) { + this.priority = priority; + } + + /** + * The different jobs have different default priorities. + * */ + public void setPriorityBasedOnPriorityOffset(Integer priorityOffset) { + this.priority = getDefaultPriority() + (priorityOffset == null ? 0 : priorityOffset); + } + /* * discarded is triggered by JobService, the Scheduler is not awake of that * diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java index b1244b5..5837bd5 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java @@ -77,6 +77,7 @@ public class ExecutableManager { result.setUuid(executable.getId()); result.setType(executable.getClass().getName()); result.setParams(executable.getParams()); + result.setPriority(executable.getPriority()); if (executable instanceof ChainedExecutable) { List<ExecutablePO> tasks = Lists.newArrayList(); for (AbstractExecutable task : ((ChainedExecutable) executable).getTasks()) { @@ -569,6 +570,7 @@ public class ExecutableManager { result.setId(executablePO.getUuid()); result.setName(executablePO.getName()); result.setParams(executablePO.getParams()); + result.setPriority(executablePO.getPriority()); if (!(result instanceof BrokenExecutable)) { List<ExecutablePO> tasks = executablePO.getTasks(); diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java index 21cd8e9..04e40a5 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultFetcherRunner.java @@ -47,7 +47,13 @@ public class DefaultFetcherRunner extends FetcherRunner { return; } - int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 0, nDiscarded = 0, nSUCCEED = 0; + nRunning = 0; + nReady = 0; + nStopped = 0; + nOthers = 0; + nError = 0; + nDiscarded = 0; + nSUCCEED = 0; for (final String id : getExecutableManger().getAllJobIdsInCache()) { if (isJobPoolFull()) { return; @@ -61,22 +67,7 @@ public class DefaultFetcherRunner extends FetcherRunner { final Output outputDigest = getExecutableManger().getOutputDigest(id); if ((outputDigest.getState() != ExecutableState.READY)) { // logger.debug("Job id:" + id + " not runnable"); - if (outputDigest.getState() == ExecutableState.SUCCEED) { - nSUCCEED++; - } else if (outputDigest.getState() == ExecutableState.ERROR) { - nError++; - } else if (outputDigest.getState() == ExecutableState.DISCARDED) { - nDiscarded++; - } else if (outputDigest.getState() == ExecutableState.STOPPED) { - nStopped++; - } else { - if (fetchFailed) { - getExecutableManger().forceKillJob(id); - nError++; - } else { - nOthers++; - } - } + jobStateCount(id); continue; } diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java index 9d8f20e..36d6250 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java @@ -24,6 +24,8 @@ import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.Executable; import org.apache.kylin.job.execution.ExecutableManager; +import org.apache.kylin.job.execution.ExecutableState; +import org.apache.kylin.job.execution.Output; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +39,7 @@ public abstract class FetcherRunner implements Runnable { protected DefaultContext context; protected JobExecutor jobExecutor; protected volatile boolean fetchFailed = false; + protected static int nRunning, nReady, nStopped, nOthers, nError, nDiscarded, nSUCCEED; public FetcherRunner(JobEngineConfig jobEngineConfig, DefaultContext context, JobExecutor jobExecutor) { this.jobEngineConfig = jobEngineConfig; @@ -66,6 +69,27 @@ public abstract class FetcherRunner implements Runnable { logger.warn(jobDesc + " fail to schedule", ex); } } + + protected void jobStateCount(String id) { + final Output outputDigest = getExecutableManger().getOutputDigest(id); + // logger.debug("Job id:" + id + " not runnable"); + if (outputDigest.getState() == ExecutableState.SUCCEED) { + nSUCCEED++; + } else if (outputDigest.getState() == ExecutableState.ERROR) { + nError++; + } else if (outputDigest.getState() == ExecutableState.DISCARDED) { + nDiscarded++; + } else if (outputDigest.getState() == ExecutableState.STOPPED) { + nStopped++; + } else { + if (fetchFailed) { + getExecutableManger().forceKillJob(id); + nError++; + } else { + nOthers++; + } + } + } @VisibleForTesting void setFetchFailed(boolean fetchFailed) { diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java index 0792ed0..22732ea 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java @@ -83,7 +83,13 @@ public class PriorityFetcherRunner extends FetcherRunner { executableWithPriority.getSecond() + 1); } - int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 0, nDiscarded = 0, nSUCCEED = 0; + nRunning = 0; + nReady = 0; + nStopped = 0; + nOthers = 0; + nError = 0; + nDiscarded = 0; + nSUCCEED = 0; for (final String id : getExecutableManger().getAllJobIdsInCache()) { if (runningJobs.containsKey(id)) { // logger.debug("Job id:" + id + " is already running"); @@ -93,23 +99,7 @@ public class PriorityFetcherRunner extends FetcherRunner { final Output outputDigest = getExecutableManger().getOutputDigest(id); if ((outputDigest.getState() != ExecutableState.READY)) { - // logger.debug("Job id:" + id + " not runnable"); - if (outputDigest.getState() == ExecutableState.SUCCEED) { - nSUCCEED++; - } else if (outputDigest.getState() == ExecutableState.ERROR) { - nError++; - } else if (outputDigest.getState() == ExecutableState.DISCARDED) { - nDiscarded++; - } else if (outputDigest.getState() == ExecutableState.STOPPED) { - nStopped++; - } else { - if (fetchFailed) { - getExecutableManger().forceKillJob(id); - nError++; - } else { - nOthers++; - } - } + jobStateCount(id); continue; } @@ -122,7 +112,7 @@ public class PriorityFetcherRunner extends FetcherRunner { nReady++; Integer priority = leftJobPriorities.get(id); if (priority == null) { - priority = executable.getDefaultPriority(); + priority = executable.getPriority(); } jobPriorityQueue.add(new Pair<>(executable, priority)); } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java index 1695a22..0e78e9c 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java @@ -42,8 +42,8 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { private final IMRBatchCubingInputSide inputSide; private final IMRBatchCubingOutputSide2 outputSide; - public BatchCubingJobBuilder2(CubeSegment newSegment, String submitter) { - super(newSegment, submitter); + public BatchCubingJobBuilder2(CubeSegment newSegment, String submitter, Integer priorityOffset) { + super(newSegment, submitter, priorityOffset); this.inputSide = MRUtil.getBatchCubingInputSide(seg); this.outputSide = MRUtil.getBatchCubingOutputSide2(seg); } @@ -86,6 +86,10 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { result.addTask(createUpdateCubeInfoAfterBuildStep(jobId, lookupMaterializeContext)); inputSide.addStepPhase4_Cleanup(result); outputSide.addStepPhase4_Cleanup(result); + + // Set the task priority if specified + result.setPriorityBasedOnPriorityOffset(priorityOffset); + result.getTasks().forEach(task -> task.setPriorityBasedOnPriorityOffset(priorityOffset)); return result; } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java index 11c7d36..4a83dea 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java @@ -59,6 +59,7 @@ public class JobBuilderSupport { final protected JobEngineConfig config; final protected CubeSegment seg; final protected String submitter; + final protected Integer priorityOffset; final public static String LayeredCuboidFolderPrefix = "level_"; @@ -68,10 +69,15 @@ public class JobBuilderSupport { final public static Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"); public JobBuilderSupport(CubeSegment seg, String submitter) { + this(seg, submitter, 0); + } + + public JobBuilderSupport(CubeSegment seg, String submitter, Integer priorityOffset) { Preconditions.checkNotNull(seg, "segment cannot be null"); this.config = new JobEngineConfig(seg.getConfig()); this.seg = seg; this.submitter = submitter; + this.priorityOffset = priorityOffset; } public MapReduceExecutable createFactDistinctColumnsStep(String jobId) { diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java index 665e791..4aceae0 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java @@ -38,8 +38,8 @@ public class MRBatchCubingEngine2 implements IBatchCubingEngine { } @Override - public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) { - return new BatchCubingJobBuilder2(newSegment, submitter).build(); + public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter, Integer priorityOffset) { + return new BatchCubingJobBuilder2(newSegment, submitter, priorityOffset).build(); } @Override diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java index 47316b4..d3afb03 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java @@ -40,8 +40,8 @@ public class SparkBatchCubingEngine2 implements IBatchCubingEngine { } @Override - public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) { - return new SparkBatchCubingJobBuilder2(newSegment, submitter).build(); + public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter, Integer priorityOffset) { + return new SparkBatchCubingJobBuilder2(newSegment, submitter, priorityOffset).build(); } @Override diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java index 089ca24..1d2e78e 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java @@ -45,7 +45,11 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport { private final ISparkOutput.ISparkBatchCubingOutputSide outputSide; public SparkBatchCubingJobBuilder2(CubeSegment newSegment, String submitter) { - super(newSegment, submitter); + this(newSegment, submitter, 0); + } + + public SparkBatchCubingJobBuilder2(CubeSegment newSegment, String submitter, Integer priorityOffset) { + super(newSegment, submitter, priorityOffset); this.inputSide = SparkUtil.getBatchCubingInputSide(seg); this.outputSide = SparkUtil.getBatchCubingOutputSide(seg); } @@ -89,6 +93,10 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport { inputSide.addStepPhase4_Cleanup(result); outputSide.addStepPhase4_Cleanup(result); + // Set the task priority if specified + result.setPriorityBasedOnPriorityOffset(priorityOffset); + result.getTasks().forEach(task -> task.setPriorityBasedOnPriorityOffset(priorityOffset)); + return result; } diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java index 08b03a4..c3f45a6 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java @@ -352,7 +352,7 @@ public class CubeController extends BasicController { @ResponseBody public JobInstance rebuild(@PathVariable String cubeName, @RequestBody JobBuildRequest req) { return buildInternal(cubeName, new TSRange(req.getStartTime(), req.getEndTime()), null, null, null, - req.getBuildType(), req.isForce() || req.isForceMergeEmptySegment()); + req.getBuildType(), req.isForce() || req.isForceMergeEmptySegment(), req.getPriorityOffset()); } /** @@ -380,19 +380,19 @@ public class CubeController extends BasicController { public JobInstance rebuild2(@PathVariable String cubeName, @RequestBody JobBuildRequest2 req) { return buildInternal(cubeName, null, new SegmentRange(req.getSourceOffsetStart(), req.getSourceOffsetEnd()), req.getSourcePartitionOffsetStart(), req.getSourcePartitionOffsetEnd(), req.getBuildType(), - req.isForce()); + req.isForce(), req.getPriorityOffset()); } private JobInstance buildInternal(String cubeName, TSRange tsRange, SegmentRange segRange, // Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd, - String buildType, boolean force) { + String buildType, boolean force, Integer priorityOffset) { try { String submitter = SecurityContextHolder.getContext().getAuthentication().getName(); CubeInstance cube = jobService.getCubeManager().getCube(cubeName); checkBuildingSegment(cube); return jobService.submitJob(cube, tsRange, segRange, sourcePartitionOffsetStart, sourcePartitionOffsetEnd, - CubeBuildTypeEnum.valueOf(buildType), force, submitter); + CubeBuildTypeEnum.valueOf(buildType), force, submitter, priorityOffset); } catch (Throwable e) { logger.error(e.getLocalizedMessage(), e); throw new InternalErrorException(e.getLocalizedMessage(), e); diff --git a/server-base/src/main/java/org/apache/kylin/rest/request/JobBuildRequest.java b/server-base/src/main/java/org/apache/kylin/rest/request/JobBuildRequest.java index 270d2d5..ff747a1 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/request/JobBuildRequest.java +++ b/server-base/src/main/java/org/apache/kylin/rest/request/JobBuildRequest.java @@ -30,6 +30,8 @@ public class JobBuildRequest { private boolean forceMergeEmptySegment = false; + private Integer priorityOffset = 0; + public long getStartTime() { return startTime; } @@ -69,4 +71,12 @@ public class JobBuildRequest { public void setForceMergeEmptySegment(boolean forceMergeEmptySegment) { this.forceMergeEmptySegment = forceMergeEmptySegment; } + + public Integer getPriorityOffset() { + return priorityOffset; + } + + public void setPriorityOffset(Integer priorityOffset) { + this.priorityOffset = priorityOffset; + } } diff --git a/server-base/src/main/java/org/apache/kylin/rest/request/JobBuildRequest2.java b/server-base/src/main/java/org/apache/kylin/rest/request/JobBuildRequest2.java index a5986ad..ab20996 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/request/JobBuildRequest2.java +++ b/server-base/src/main/java/org/apache/kylin/rest/request/JobBuildRequest2.java @@ -35,6 +35,8 @@ public class JobBuildRequest2 { private String buildType; private boolean force; + + private Integer priorityOffset = 0; public long getSourceOffsetStart() { return sourceOffsetStart; @@ -84,4 +86,11 @@ public class JobBuildRequest2 { this.force = force; } + public Integer getPriorityOffset() { + return priorityOffset; + } + + public void setPriorityOffset(Integer priorityOffset) { + this.priorityOffset = priorityOffset; + } } diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java index 31a4119..8182f3d 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -204,17 +204,17 @@ public class JobService extends BasicService implements InitializingBean { public JobInstance submitJob(CubeInstance cube, TSRange tsRange, SegmentRange segRange, // Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd, - CubeBuildTypeEnum buildType, boolean force, String submitter) throws IOException { + CubeBuildTypeEnum buildType, boolean force, String submitter, Integer priorityOffset) throws IOException { aclEvaluate.checkProjectOperationPermission(cube); JobInstance jobInstance = submitJobInternal(cube, tsRange, segRange, sourcePartitionOffsetStart, - sourcePartitionOffsetEnd, buildType, force, submitter); + sourcePartitionOffsetEnd, buildType, force, submitter, priorityOffset); return jobInstance; } public JobInstance submitJobInternal(CubeInstance cube, TSRange tsRange, SegmentRange segRange, // Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd, // - CubeBuildTypeEnum buildType, boolean force, String submitter) throws IOException { + CubeBuildTypeEnum buildType, boolean force, String submitter, Integer priorityOffset) throws IOException { Message msg = MsgPicker.getMsg(); if (cube.getStatus() == RealizationStatusEnum.DESCBROKEN) { @@ -238,13 +238,13 @@ public class JobService extends BasicService implements InitializingBean { sourcePartitionOffsetEnd); src = source.enrichSourcePartitionBeforeBuild(cube, src); newSeg = getCubeManager().appendSegment(cube, src); - job = EngineFactory.createBatchCubingJob(newSeg, submitter); + job = EngineFactory.createBatchCubingJob(newSeg, submitter, priorityOffset); } else if (buildType == CubeBuildTypeEnum.MERGE) { newSeg = getCubeManager().mergeSegments(cube, tsRange, segRange, force); job = EngineFactory.createBatchMergeJob(newSeg, submitter); } else if (buildType == CubeBuildTypeEnum.REFRESH) { newSeg = getCubeManager().refreshSegment(cube, tsRange, segRange); - job = EngineFactory.createBatchCubingJob(newSeg, submitter); + job = EngineFactory.createBatchCubingJob(newSeg, submitter, priorityOffset); } else { throw new BadRequestException(String.format(Locale.ROOT, msg.getINVALID_BUILD_TYPE(), buildType)); } diff --git a/tool/src/main/java/org/apache/kylin/tool/job/CubeBuildingCLI.java b/tool/src/main/java/org/apache/kylin/tool/job/CubeBuildingCLI.java index b3b1126..eb4f7f7 100644 --- a/tool/src/main/java/org/apache/kylin/tool/job/CubeBuildingCLI.java +++ b/tool/src/main/java/org/apache/kylin/tool/job/CubeBuildingCLI.java @@ -107,13 +107,13 @@ public class CubeBuildingCLI extends AbstractApplication { if (buildType == CubeBuildTypeEnum.BUILD) { CubeSegment newSeg = cubeManager.appendSegment(cube, tsRange); - job = EngineFactory.createBatchCubingJob(newSeg, submitter); + job = EngineFactory.createBatchCubingJob(newSeg, submitter, null); } else if (buildType == CubeBuildTypeEnum.MERGE) { CubeSegment newSeg = cubeManager.mergeSegments(cube, tsRange, null, forceMergeEmptySeg); job = EngineFactory.createBatchMergeJob(newSeg, submitter); } else if (buildType == CubeBuildTypeEnum.REFRESH) { CubeSegment refreshSeg = cubeManager.refreshSegment(cube, tsRange, null); - job = EngineFactory.createBatchCubingJob(refreshSeg, submitter); + job = EngineFactory.createBatchCubingJob(refreshSeg, submitter, null); } else { throw new JobException("invalid build type:" + buildType); }