This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new ed03f1e KYLIN-4098: Add cube auto merge api ed03f1e is described below commit ed03f1e1c3e83ea4e97673c07677719b24ef096a Author: Liu Shaohui <liushao...@xiaomi.com> AuthorDate: Mon May 13 16:34:13 2019 +0800 KYLIN-4098: Add cube auto merge api --- .../org/apache/kylin/metadata/model/Segments.java | 8 ++--- .../kylin/rest/controller/CubeController.java | 37 +++++++++++++++++++++- .../org/apache/kylin/rest/service/CubeService.java | 15 ++++++--- 3 files changed, 51 insertions(+), 9 deletions(-) diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java index d66f899..0fdb2ea 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java @@ -178,9 +178,9 @@ public class Segments<T extends ISegment> extends ArrayList<T> implements Serial } Segments volatileSegs = new Segments(); for(T seg: segs) { - if(seg.getTSRange().end.v + volatileRange > latestSegEndTs) { - logger.warn("segment in volatile range: seg:" + seg.toString() + - "rangeStart:" + seg.getTSRange().start.v + ", rangeEnd" + seg.getTSRange().end.v); + if(seg.getTSRange().end.v + volatileRange >= latestSegEndTs) { + logger.warn("segment in volatile range: seg: " + seg.toString() + + ", rangeStart:" + seg.getTSRange().start.v + ", rangeEnd" + seg.getTSRange().end.v); volatileSegs.add(seg); } } @@ -485,4 +485,4 @@ public class Segments<T extends ISegment> extends ArrayList<T> implements Serial } return true; } -} \ No newline at end of file +} diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java index f664e66..8efa420 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java @@ -79,6 +79,8 @@ import org.apache.kylin.rest.response.ResponseCode; import org.apache.kylin.rest.service.CubeService; import org.apache.kylin.rest.service.JobService; import org.apache.kylin.rest.service.ProjectService; +import org.apache.kylin.rest.service.QueryService; +import org.apache.kylin.rest.util.AclEvaluate; import org.apache.kylin.rest.util.ValidateUtil; import org.apache.kylin.source.kafka.util.KafkaClient; import org.slf4j.Logger; @@ -122,6 +124,13 @@ public class CubeController extends BasicController { @Qualifier("projectService") private ProjectService projectService; + @Autowired + @Qualifier("queryService") + private QueryService queryService; + + @Autowired + private AclEvaluate aclEvaluate; + @RequestMapping(value = "/validate/{cubeName}", method = RequestMethod.GET, produces = { "application/json" }) @ResponseBody public EnvelopeResponse<Boolean> validateModelName(@PathVariable String cubeName) { @@ -251,7 +260,6 @@ public class CubeController extends BasicController { logger.error(e.getLocalizedMessage(), e); throw new InternalErrorException(e.getLocalizedMessage(), e); } - } @RequestMapping(value = "/{cubeName}/cost", method = { RequestMethod.PUT }, produces = { "application/json" }) @@ -401,6 +409,33 @@ public class CubeController extends BasicController { } /** + * Send a auto merge cube job + * + * @param cubeName Cube ID + * @return JobInstance of merging cube + */ + @RequestMapping(value = "/{cubeName}/automerge", method = { RequestMethod.PUT }) + @ResponseBody + public JobInstance autoMerge(@PathVariable String cubeName) { + try { + checkCubeExists(cubeName); + + CubeInstance cube = jobService.getCubeManager().getCube(cubeName); + aclEvaluate.checkProjectAdminPermission(cube.getProject()); + + String jobID = cubeService.mergeCubeSegment(cubeName); + if (jobID == null) { + throw new BadRequestException(String.format(Locale.ROOT, + "Cube: %s merging is not supported or no segments to merge", cubeName)); + } + return jobService.getJobInstance(jobID); + } catch (Exception e) { + logger.error(e.getLocalizedMessage(), e); + throw new InternalErrorException(e.getLocalizedMessage()); + } + } + + /** * Send a optimize cube job * * @param cubeName Cube ID diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java index 792fbd3..d068026 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java @@ -720,10 +720,15 @@ public class CubeService extends BasicService implements InitializingBean { } } - private void mergeCubeSegment(String cubeName) { + public String mergeCubeSegment(String cubeName) { CubeInstance cube = getCubeManager().getCube(cubeName); if (!cube.needAutoMerge()) - return; + return null; + + if (!cube.isReady()) { + logger.info("The cube: {} is disabled", cubeName); + return null; + } synchronized (CubeService.class) { try { @@ -731,16 +736,18 @@ public class CubeService extends BasicService implements InitializingBean { SegmentRange offsets = cube.autoMergeCubeSegments(); if (offsets != null && !isMergingJobBeenDiscarded(cube, cubeName, cube.getProject(), offsets)) { CubeSegment newSeg = getCubeManager().mergeSegments(cube, null, offsets, true); - logger.debug("Will submit merge job on " + newSeg); + logger.info("Will submit merge job on " + newSeg); DefaultChainedExecutable job = EngineFactory.createBatchMergeJob(newSeg, "SYSTEM"); getExecutableManager().addJob(job); + return job.getId(); } else { - logger.debug("Not ready for merge on cube " + cubeName); + logger.info("Not ready for merge on cube " + cubeName); } } catch (IOException e) { logger.error("Failed to auto merge cube " + cubeName, e); } } + return null; } //Don't merge the job that has been discarded manually before