This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new ed03f1e  KYLIN-4098: Add cube auto merge api
ed03f1e is described below

commit ed03f1e1c3e83ea4e97673c07677719b24ef096a
Author: Liu Shaohui <liushao...@xiaomi.com>
AuthorDate: Mon May 13 16:34:13 2019 +0800

    KYLIN-4098: Add cube auto merge api
---
 .../org/apache/kylin/metadata/model/Segments.java  |  8 ++---
 .../kylin/rest/controller/CubeController.java      | 37 +++++++++++++++++++++-
 .../org/apache/kylin/rest/service/CubeService.java | 15 ++++++---
 3 files changed, 51 insertions(+), 9 deletions(-)

diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java
index d66f899..0fdb2ea 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java
@@ -178,9 +178,9 @@ public class Segments<T extends ISegment> extends 
ArrayList<T> implements Serial
         }
         Segments volatileSegs = new Segments();
         for(T seg: segs) {
-            if(seg.getTSRange().end.v + volatileRange > latestSegEndTs) {
-                logger.warn("segment in volatile range: seg:" + seg.toString() 
+
-                        "rangeStart:" + seg.getTSRange().start.v + ", 
rangeEnd" + seg.getTSRange().end.v);
+            if(seg.getTSRange().end.v + volatileRange >= latestSegEndTs) {
+                logger.warn("segment in volatile range: seg: " + 
seg.toString() +
+                        ", rangeStart:" + seg.getTSRange().start.v + ", 
rangeEnd" + seg.getTSRange().end.v);
                 volatileSegs.add(seg);
             }
         }
@@ -485,4 +485,4 @@ public class Segments<T extends ISegment> extends 
ArrayList<T> implements Serial
         }
         return true;
     }
-}
\ No newline at end of file
+}
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
 
b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index f664e66..8efa420 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -79,6 +79,8 @@ import org.apache.kylin.rest.response.ResponseCode;
 import org.apache.kylin.rest.service.CubeService;
 import org.apache.kylin.rest.service.JobService;
 import org.apache.kylin.rest.service.ProjectService;
+import org.apache.kylin.rest.service.QueryService;
+import org.apache.kylin.rest.util.AclEvaluate;
 import org.apache.kylin.rest.util.ValidateUtil;
 import org.apache.kylin.source.kafka.util.KafkaClient;
 import org.slf4j.Logger;
@@ -122,6 +124,13 @@ public class CubeController extends BasicController {
     @Qualifier("projectService")
     private ProjectService projectService;
 
+    @Autowired
+    @Qualifier("queryService")
+    private QueryService queryService;
+
+    @Autowired
+    private AclEvaluate aclEvaluate;
+
     @RequestMapping(value = "/validate/{cubeName}", method = 
RequestMethod.GET, produces = { "application/json" })
     @ResponseBody
     public EnvelopeResponse<Boolean> validateModelName(@PathVariable String 
cubeName) {
@@ -251,7 +260,6 @@ public class CubeController extends BasicController {
             logger.error(e.getLocalizedMessage(), e);
             throw new InternalErrorException(e.getLocalizedMessage(), e);
         }
-
     }
 
     @RequestMapping(value = "/{cubeName}/cost", method = { RequestMethod.PUT 
}, produces = { "application/json" })
@@ -401,6 +409,33 @@ public class CubeController extends BasicController {
     }
 
     /**
+     * Send a auto merge cube job
+     *
+     * @param cubeName Cube ID
+     * @return JobInstance of merging cube
+     */
+    @RequestMapping(value = "/{cubeName}/automerge", method = { 
RequestMethod.PUT })
+    @ResponseBody
+    public JobInstance autoMerge(@PathVariable String cubeName) {
+        try {
+            checkCubeExists(cubeName);
+
+            CubeInstance cube = jobService.getCubeManager().getCube(cubeName);
+            aclEvaluate.checkProjectAdminPermission(cube.getProject());
+
+            String jobID = cubeService.mergeCubeSegment(cubeName);
+            if (jobID == null) {
+                throw new BadRequestException(String.format(Locale.ROOT,
+                    "Cube: %s merging is not supported or no segments to 
merge", cubeName));
+            }
+            return jobService.getJobInstance(jobID);
+        } catch (Exception e) {
+            logger.error(e.getLocalizedMessage(), e);
+            throw new InternalErrorException(e.getLocalizedMessage());
+        }
+    }
+
+    /**
      * Send a optimize cube job
      *
      * @param cubeName Cube ID
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index 792fbd3..d068026 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -720,10 +720,15 @@ public class CubeService extends BasicService implements 
InitializingBean {
         }
     }
 
-    private void mergeCubeSegment(String cubeName) {
+    public String mergeCubeSegment(String cubeName) {
         CubeInstance cube = getCubeManager().getCube(cubeName);
         if (!cube.needAutoMerge())
-            return;
+            return null;
+
+        if (!cube.isReady()) {
+            logger.info("The cube: {} is disabled", cubeName);
+            return null;
+        }
 
         synchronized (CubeService.class) {
             try {
@@ -731,16 +736,18 @@ public class CubeService extends BasicService implements 
InitializingBean {
                 SegmentRange offsets = cube.autoMergeCubeSegments();
                 if (offsets != null && !isMergingJobBeenDiscarded(cube, 
cubeName, cube.getProject(), offsets)) {
                     CubeSegment newSeg = getCubeManager().mergeSegments(cube, 
null, offsets, true);
-                    logger.debug("Will submit merge job on " + newSeg);
+                    logger.info("Will submit merge job on " + newSeg);
                     DefaultChainedExecutable job = 
EngineFactory.createBatchMergeJob(newSeg, "SYSTEM");
                     getExecutableManager().addJob(job);
+                    return job.getId();
                 } else {
-                    logger.debug("Not ready for merge on cube " + cubeName);
+                    logger.info("Not ready for merge on cube " + cubeName);
                 }
             } catch (IOException e) {
                 logger.error("Failed to auto merge cube " + cubeName, e);
             }
         }
+        return null;
     }
 
     //Don't merge the job that has been discarded manually before

Reply via email to