This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b473d1  Add rule for submit build job: forbid submitting appending 
segment in the lambda stream cube (#228)
9b473d1 is described below

commit 9b473d14a18044794ce0f23047df715e7ed34d0a
Author: kliu3 <liu...@apache.org>
AuthorDate: Fri Apr 16 14:26:17 2021 +0800

    Add rule for submit build job: forbid submitting appending segment in the 
lambda stream cube (#228)
---
 .../main/java/org/apache/kylin/cube/CubeInstance.java |  4 ++++
 .../org/apache/kylin/rest/service/JobService.java     | 19 +++++++++++++++++++
 2 files changed, 23 insertions(+)

diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index 7c81e4f..ee98b54 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -773,4 +773,8 @@ public class CubeInstance extends RootPersistentEntity 
implements IRealization,
         throw new IllegalStateException("No segment's last build job ID equals 
" + jobID);
     }
 
+    public boolean isStreamLambdaCube() {
+        return getModel().getRootFactTable().getTableDesc().isLambdaTable();
+    }
+
 }
\ No newline at end of file
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index a5f44b5..41c74f7 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -237,6 +237,9 @@ public class JobService extends BasicService implements 
InitializingBean {
 
         checkCubeDescSignature(cube);
         checkAllowBuilding(cube);
+        if (cube.isStreamLambdaCube() && buildType == CubeBuildTypeEnum.BUILD) 
{
+            checkStreamLambdaBuildingSegment(cube, tsRange);
+        }
 
         if (buildType == CubeBuildTypeEnum.BUILD || buildType == 
CubeBuildTypeEnum.REFRESH) {
             checkAllowParallelBuilding(cube);
@@ -435,6 +438,22 @@ public class JobService extends BasicService implements 
InitializingBean {
         }
     }
 
+    private void checkStreamLambdaBuildingSegment(CubeInstance cube, TSRange 
tsRange) {
+        if (tsRange == null || tsRange.end.v < tsRange.start.v) {
+            throw new BadRequestException("The tsRange is invalid " + tsRange +
+                "for the cube " + cube.getName() +
+                ". It's not allowed to submit a build job");
+        }
+        CubeSegment latestReadySegment = cube.getLatestReadySegment();
+
+        if (latestReadySegment != null && tsRange.end.v > 
latestReadySegment.getTSRange().end.v) {
+            throw new BadRequestException(
+                "The stream cube " + cube.getName() + "can't be submitted the 
appending segment. " +
+                    "The latest ready segment tsrange is " + 
latestReadySegment.getTSRange() +
+                    ", and the appending segment tsrange is " + tsRange);
+        }
+    }
+
     private void checkAllowParallelBuilding(CubeInstance cube) {
         if (cube.getConfig().isCubePlannerEnabled()) {
             if (cube.getCuboids() == null) {

Reply via email to