This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new 9b473d1 Add rule for submit build job: forbid submitting appending segment in the lambda stream cube (#228) 9b473d1 is described below commit 9b473d14a18044794ce0f23047df715e7ed34d0a Author: kliu3 <liu...@apache.org> AuthorDate: Fri Apr 16 14:26:17 2021 +0800 Add rule for submit build job: forbid submitting appending segment in the lambda stream cube (#228) --- .../main/java/org/apache/kylin/cube/CubeInstance.java | 4 ++++ .../org/apache/kylin/rest/service/JobService.java | 19 +++++++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java index 7c81e4f..ee98b54 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java @@ -773,4 +773,8 @@ public class CubeInstance extends RootPersistentEntity implements IRealization, throw new IllegalStateException("No segment's last build job ID equals " + jobID); } + public boolean isStreamLambdaCube() { + return getModel().getRootFactTable().getTableDesc().isLambdaTable(); + } + } \ No newline at end of file diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java index a5f44b5..41c74f7 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -237,6 +237,9 @@ public class JobService extends BasicService implements InitializingBean { checkCubeDescSignature(cube); checkAllowBuilding(cube); + if (cube.isStreamLambdaCube() && buildType == CubeBuildTypeEnum.BUILD) { + checkStreamLambdaBuildingSegment(cube, tsRange); + } if (buildType == CubeBuildTypeEnum.BUILD || buildType == CubeBuildTypeEnum.REFRESH) { checkAllowParallelBuilding(cube); @@ -435,6 +438,22 @@ public class JobService extends BasicService implements InitializingBean { } } + private void checkStreamLambdaBuildingSegment(CubeInstance cube, TSRange tsRange) { + if (tsRange == null || tsRange.end.v < tsRange.start.v) { + throw new BadRequestException("The tsRange is invalid " + tsRange + + "for the cube " + cube.getName() + + ". It's not allowed to submit a build job"); + } + CubeSegment latestReadySegment = cube.getLatestReadySegment(); + + if (latestReadySegment != null && tsRange.end.v > latestReadySegment.getTSRange().end.v) { + throw new BadRequestException( + "The stream cube " + cube.getName() + "can't be submitted the appending segment. " + + "The latest ready segment tsrange is " + latestReadySegment.getTSRange() + + ", and the appending segment tsrange is " + tsRange); + } + } + private void checkAllowParallelBuilding(CubeInstance cube) { if (cube.getConfig().isCubePlannerEnabled()) { if (cube.getCuboids() == null) {