This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit a9de3a2a27169a9695684a7dd1c2f7c5f807e10d Author: sibingzhang <74443791+sibingzh...@users.noreply.github.com> AuthorDate: Wed Nov 9 14:49:15 2022 +0800 KYLIN-5390 Build segment support for overlap segments --- .../org/apache/kylin/common/KylinConfigBase.java | 4 ++ .../resources/kylin_error_msg_conf_cn.properties | 2 +- .../resources/kylin_error_msg_conf_en.properties | 2 +- .../apache/kylin/common/KylinConfigBaseTest.java | 9 +++ .../org/apache/kylin/job/common/SegmentUtil.java | 18 ++--- .../org/apache/kylin/job/common/SegmentsTest.java | 53 ++++++++++++-- .../kylin/rest/controller/SegmentController.java | 3 +- .../kylin/rest/service/ModelBuildService.java | 18 ++++- .../apache/kylin/rest/service/ModelService.java | 53 +++++++++----- .../kylin/rest/service/ModelServiceTest.java | 80 +++++++++++++++++++++- .../spark/merger/AfterBuildResourceMerger.java | 6 +- 11 files changed, 209 insertions(+), 39 deletions(-) diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 64606df279..309ce19ced 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -3670,6 +3670,10 @@ public abstract class KylinConfigBase implements Serializable { return Integer.parseInt(getOptional("kylin.second-storage.wait-lock-timeout", "180")); } + public boolean isBuildSegmentOverlapEnabled() { + return Boolean.parseBoolean(getOptional("kylin.build.segment-overlap-enabled", FALSE)); + } + public boolean getDDLEnabled(){ return Boolean.parseBoolean(getOptional("kylin.source.ddl.enabled", FALSE)); } diff --git a/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties b/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties index 759514ee40..7fab056ac7 100644 --- a/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties +++ b/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties @@ -40,7 +40,7 @@ KE-010025201=无法找到相关 Cube。 ## 100222XX Segment KE-010022201=无法刷新,Segment 范围 “%s” 超出了加载数据的范围 “%s”。请修改后重试。 -KE-010022202=无法构建,待构建的范围和已构建的范围在 “%s” 到 “%s” 之间存在重合。请修改后重试。 +KE-010022202=无法构建,待构建的范围和已构建的范围 “%s” 存在重合。请修改后重试。 KE-010022203=无法刷新,请选择 Segment 后再试。 KE-010022204=无法刷新,部分 Segment 正在构建。请稍后再试。 KE-010022205=无法刷新,所选 Segment 范围为空。请重新选择后再试。 diff --git a/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties b/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties index 02ecccdb25..967a6c7d8c 100644 --- a/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties +++ b/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties @@ -40,7 +40,7 @@ KE-010025201=Can't find the cube. ## 100222XX Segment KE-010022201=Can't refresh. The segment range "%s" exceeds the range of loaded data, which is "%s". Please modify and try again. -KE-010022202=Can't build segment. The specified data range overlaps with the built segments from "%s" to "%s". Please modify and try again. +KE-010022202=Can't build segment. The specified data range overlaps with the built segments "%s". Please modify and try again. KE-010022203=Can't refresh. Please select segment and try again. KE-010022204=Can't refresh, some segments are being built. Please try again later. KE-010022205=Can't refresh, the selected segment range is empty. Please reselect and try again. diff --git a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java index 16f653d46b..f2c3cb1a2f 100644 --- a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java +++ b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java @@ -1380,6 +1380,15 @@ class KylinConfigBaseTest { val sub2 = config.getSubstitutor(); Assertions.assertSame(sub1, sub2); } + + @Test + void testIsBuildSegmentOverlapEnabled() { + KylinConfig config = KylinConfig.getInstanceFromEnv(); + config.setProperty("kylin.build.segment-overlap-enabled", "false"); + assertFalse(config.isBuildSegmentOverlapEnabled()); + config.setProperty("kylin.build.segment-overlap-enabled", "true"); + assertTrue(config.isBuildSegmentOverlapEnabled()); + } } class EnvironmentUpdateUtils { diff --git a/src/core-job/src/main/java/org/apache/kylin/job/common/SegmentUtil.java b/src/core-job/src/main/java/org/apache/kylin/job/common/SegmentUtil.java index e04777ef7f..0185cee866 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/common/SegmentUtil.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/common/SegmentUtil.java @@ -17,6 +17,7 @@ */ package org.apache.kylin.job.common; +import static org.apache.kylin.job.execution.JobTypeEnum.INC_BUILD; import static org.apache.kylin.job.execution.JobTypeEnum.INDEX_BUILD; import static org.apache.kylin.job.execution.JobTypeEnum.SUB_PARTITION_BUILD; @@ -30,13 +31,13 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.execution.NExecutableManager; +import org.apache.kylin.metadata.cube.model.NDataSegment; +import org.apache.kylin.metadata.cube.model.NDataflowManager; +import org.apache.kylin.metadata.cube.model.PartitionStatusEnum; import org.apache.kylin.metadata.model.ISegment; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.SegmentStatusEnumToDisplay; import org.apache.kylin.metadata.model.Segments; -import org.apache.kylin.metadata.cube.model.NDataSegment; -import org.apache.kylin.metadata.cube.model.NDataflowManager; -import org.apache.kylin.metadata.cube.model.PartitionStatusEnum; import com.google.common.base.Preconditions; import com.google.common.collect.Sets; @@ -64,12 +65,13 @@ public class SegmentUtil { Segments<T> overlapSegs = segments.getSegmentsByRange(segment.getSegRange()); overlapSegs.remove(segment); if (SegmentStatusEnum.NEW == segment.getStatus()) { - if (CollectionUtils.isEmpty(overlapSegs)) { - return SegmentStatusEnumToDisplay.LOADING; + if (!CollectionUtils.isEmpty(overlapSegs) + && overlapSegs.get(0).getSegRange().entireOverlaps(segment.getSegRange())) { + return SegmentStatusEnumToDisplay.REFRESHING; } - if (overlapSegs.get(0).getSegRange().entireOverlaps(segment.getSegRange())) { - return SegmentStatusEnumToDisplay.REFRESHING; + if (CollectionUtils.isEmpty(overlapSegs) || anyIndexJobRunning(segment, executables)) { + return SegmentStatusEnumToDisplay.LOADING; } return SegmentStatusEnumToDisplay.MERGING; @@ -102,7 +104,7 @@ public class SegmentUtil { protected static <T extends ISegment> boolean anyIndexJobRunning(T segment) { KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); NExecutableManager execManager = NExecutableManager.getInstance(kylinConfig, segment.getModel().getProject()); - val executables = execManager.listExecByJobTypeAndStatus(ExecutableState::isRunning, INDEX_BUILD, + val executables = execManager.listExecByJobTypeAndStatus(ExecutableState::isRunning, INDEX_BUILD, INC_BUILD, SUB_PARTITION_BUILD); return executables.stream().anyMatch(task -> task.getSegmentIds().contains(segment.getId())); } diff --git a/src/core-job/src/test/java/org/apache/kylin/job/common/SegmentsTest.java b/src/core-job/src/test/java/org/apache/kylin/job/common/SegmentsTest.java index 97eab4cdb8..587e688e21 100644 --- a/src/core-job/src/test/java/org/apache/kylin/job/common/SegmentsTest.java +++ b/src/core-job/src/test/java/org/apache/kylin/job/common/SegmentsTest.java @@ -19,19 +19,23 @@ package org.apache.kylin.job.common; import org.apache.kylin.common.util.RandomUtil; +import org.apache.kylin.job.execution.DefaultExecutable; +import org.apache.kylin.junit.TimeZoneTestRunner; +import org.apache.kylin.metadata.cube.model.NBatchConstants; +import org.apache.kylin.metadata.cube.model.NDataSegment; +import org.apache.kylin.metadata.cube.model.NDataflow; import org.apache.kylin.metadata.model.SegmentRange; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.SegmentStatusEnumToDisplay; import org.apache.kylin.metadata.model.Segments; -import org.apache.kylin.junit.TimeZoneTestRunner; -import org.apache.kylin.metadata.cube.model.NDataSegment; -import org.apache.kylin.metadata.cube.model.NDataflow; import org.junit.After; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mockito; +import com.google.common.collect.Lists; + import lombok.val; @RunWith(TimeZoneTestRunner.class) @@ -59,6 +63,41 @@ public class SegmentsTest { Assert.assertEquals(status, SegmentStatusEnumToDisplay.LOADING); } + @Test + public void testGetSegmentStatusToDisplay_Loading() { + Segments segments = new Segments(); + val seg = NDataSegment.empty(); + seg.setId("1"); + seg.setSegmentRange(new SegmentRange.TimePartitionedSegmentRange(0L, 10L)); + seg.setStatus(SegmentStatusEnum.READY); + segments.add(seg); + + val seg2 = NDataSegment.empty(); + seg2.setId("2"); + seg2.setSegmentRange(new SegmentRange.TimePartitionedSegmentRange(0L, 15L)); + seg2.setStatus(SegmentStatusEnum.NEW); + + val job = new DefaultExecutable(); + job.setParam(NBatchConstants.P_SEGMENT_IDS, "2"); + + SegmentStatusEnumToDisplay status = SegmentUtil.getSegmentStatusToDisplay(segments, seg2, + Lists.newArrayList(job)); + Assert.assertEquals(SegmentStatusEnumToDisplay.LOADING, status); + + val seg3 = NDataSegment.empty(); + seg3.setId("3"); + seg3.setSegmentRange(new SegmentRange.TimePartitionedSegmentRange(11L, 12L)); + seg3.setStatus(SegmentStatusEnum.NEW); + segments.add(seg3); + + val job2 = new DefaultExecutable(); + job2.setParam(NBatchConstants.P_SEGMENT_IDS, "3"); + + SegmentStatusEnumToDisplay status2 = SegmentUtil.getSegmentStatusToDisplay(segments, seg3, + Lists.newArrayList(job2)); + Assert.assertEquals(SegmentStatusEnumToDisplay.LOADING, status2); + } + @Test public void testGetSegmentStatusToDisplay_Ready() { Segments segments = new Segments(); @@ -157,15 +196,21 @@ public class SegmentsTest { newSeg.setStatus(SegmentStatusEnum.NEW); segments.add(newSeg); + Mockito.mockStatic(SegmentUtil.class); + Mockito.when(SegmentUtil.getSegmentStatusToDisplay(segments, newSeg, null)).thenCallRealMethod(); + Mockito.when(SegmentUtil.anyIndexJobRunning(newSeg)).thenReturn(false); SegmentStatusEnumToDisplay status = SegmentUtil.getSegmentStatusToDisplay(segments, newSeg, null); Assert.assertEquals(status, SegmentStatusEnumToDisplay.MERGING); + Mockito.when(SegmentUtil.getSegmentStatusToDisplay(segments, seg, null)).thenCallRealMethod(); + Mockito.when(SegmentUtil.anyIndexJobRunning(seg)).thenReturn(false); SegmentStatusEnumToDisplay status2 = SegmentUtil.getSegmentStatusToDisplay(segments, seg, null); Assert.assertEquals(status2, SegmentStatusEnumToDisplay.LOCKED); + Mockito.when(SegmentUtil.getSegmentStatusToDisplay(segments, seg2, null)).thenCallRealMethod(); + Mockito.when(SegmentUtil.anyIndexJobRunning(seg2)).thenReturn(false); SegmentStatusEnumToDisplay status3 = SegmentUtil.getSegmentStatusToDisplay(segments, seg2, null); Assert.assertEquals(status3, SegmentStatusEnumToDisplay.LOCKED); - } public NDataSegment newReadySegment(Long startTime, Long endTime) { diff --git a/src/data-loading-server/src/main/java/org/apache/kylin/rest/controller/SegmentController.java b/src/data-loading-server/src/main/java/org/apache/kylin/rest/controller/SegmentController.java index 5f722bd282..d1f706e275 100644 --- a/src/data-loading-server/src/main/java/org/apache/kylin/rest/controller/SegmentController.java +++ b/src/data-loading-server/src/main/java/org/apache/kylin/rest/controller/SegmentController.java @@ -166,7 +166,8 @@ public class SegmentController extends BaseController { modelId); validateDataRange(buildSegmentsRequest.getStart(), buildSegmentsRequest.getEnd(), partitionColumnFormat); val res = modelService.checkSegHoleExistIfNewRangeBuild(buildSegmentsRequest.getProject(), modelId, - buildSegmentsRequest.getStart(), buildSegmentsRequest.getEnd()); + buildSegmentsRequest.getStart(), buildSegmentsRequest.getEnd(), + buildSegmentsRequest.isBuildAllIndexes(), buildSegmentsRequest.getBatchIndexIds()); return new EnvelopeResponse<>(KylinException.CODE_SUCCESS, res, ""); } diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/ModelBuildService.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/ModelBuildService.java index 93724c8c26..6d4e17b81b 100644 --- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/ModelBuildService.java +++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/ModelBuildService.java @@ -25,6 +25,7 @@ import static org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_CONCURR import static org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_CREATE_CHECK_MULTI_PARTITION_ABANDON; import static org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_CREATE_CHECK_MULTI_PARTITION_DUPLICATE; import static org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_CREATE_CHECK_MULTI_PARTITION_EMPTY; +import static org.apache.kylin.common.exception.code.ErrorCodeServer.SEGMENT_BUILD_RANGE_OVERLAP; import java.io.IOException; import java.util.ArrayList; @@ -34,6 +35,7 @@ import java.util.List; import java.util.Locale; import java.util.Objects; import java.util.Set; +import java.util.StringJoiner; import java.util.stream.Collectors; import org.apache.commons.collections.CollectionUtils; @@ -371,7 +373,9 @@ public class ModelBuildService extends AbstractModelService implements ModelBuil Preconditions.checkArgument(!PushDownUtil.needPushdown(params.getStart(), params.getEnd()), "Load data must set start and end date"); val segmentRangeToBuild = SourceFactory.getSource(table).getSegmentRange(params.getStart(), params.getEnd()); - modelService.checkSegmentToBuildOverlapsBuilt(project, modelId, segmentRangeToBuild); + List<NDataSegment> overlapSegments = modelService.checkSegmentToBuildOverlapsBuilt(project, + modelDescInTransaction, segmentRangeToBuild, params.isNeedBuild(), params.getBatchIndexIds()); + buildSegmentOverlapExceptionInfo(overlapSegments); modelService.saveDateFormatIfNotExist(project, modelId, params.getPartitionColFormat()); checkMultiPartitionBuildParam(modelDescInTransaction, params); NDataSegment newSegment = getManager(NDataflowManager.class, project).appendSegment(df, segmentRangeToBuild, @@ -394,6 +398,18 @@ public class ModelBuildService extends AbstractModelService implements ModelBuil .licenseCheckWrap(project, () -> jobManager.addSegmentJob(jobParam))); } + private void buildSegmentOverlapExceptionInfo(List<NDataSegment> overlapSegments) { + if (CollectionUtils.isEmpty(overlapSegments)) { + return; + } + + StringJoiner joiner = new StringJoiner(",", "[", "]"); + for (NDataSegment seg : overlapSegments) { + joiner.add(seg.getName()); + } + throw new KylinException(SEGMENT_BUILD_RANGE_OVERLAP, joiner.toString()); + } + public void checkMultiPartitionBuildParam(NDataModel model, IncrementBuildSegmentParams params) { if (!model.isMultiPartitionModel()) { return; diff --git a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java index 07cab15306..e4af48b957 100644 --- a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java +++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java @@ -57,7 +57,6 @@ import static org.apache.kylin.common.exception.code.ErrorCodeServer.MODEL_NAME_ import static org.apache.kylin.common.exception.code.ErrorCodeServer.MODEL_NAME_NOT_EXIST; import static org.apache.kylin.common.exception.code.ErrorCodeServer.PARAMETER_INVALID_SUPPORT_LIST; import static org.apache.kylin.common.exception.code.ErrorCodeServer.PROJECT_NOT_EXIST; -import static org.apache.kylin.common.exception.code.ErrorCodeServer.SEGMENT_BUILD_RANGE_OVERLAP; import static org.apache.kylin.common.exception.code.ErrorCodeServer.SEGMENT_LOCKED; import static org.apache.kylin.common.exception.code.ErrorCodeServer.SEGMENT_MERGE_CONTAINS_GAPS; import static org.apache.kylin.common.exception.code.ErrorCodeServer.SEGMENT_NOT_EXIST_ID; @@ -929,7 +928,7 @@ public class ModelService extends AbstractModelService implements TableModelSupp KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); NExecutableManager execManager = NExecutableManager.getInstance(kylinConfig, project); return execManager.listPartialExec(path -> StringUtils.endsWith(path, modelId), ExecutableState::isRunning, - INDEX_BUILD, JobTypeEnum.SUB_PARTITION_BUILD); + INDEX_BUILD, INC_BUILD, JobTypeEnum.SUB_PARTITION_BUILD); } public List<NDataSegmentResponse> getSegmentsResponse(String modelId, String project, String start, String end, @@ -2295,7 +2294,7 @@ public class ModelService extends AbstractModelService implements TableModelSupp } public SegmentCheckResponse checkSegHoleExistIfNewRangeBuild(String project, String modelId, String start, - String end) { + String end, boolean isBuildAllIndexes, List<Long> batchIndexIds) { aclEvaluate.checkProjectOperationPermission(project); Preconditions.checkArgument(!PushDownUtil.needPushdown(start, end), "Load data must set start and end date"); NDataModel dataModelDesc = getManager(NDataModelManager.class, project).getDataModelDesc(modelId); @@ -2304,18 +2303,17 @@ public class ModelService extends AbstractModelService implements TableModelSupp SegmentRange segmentRangeToBuild = SourceFactory.getSource(table).getSegmentRange(start, end); List<NDataSegment> segmentGaps = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project) .checkHoleIfNewSegBuild(modelId, segmentRangeToBuild); - - Segments<NDataSegment> segments = getSegmentsByRange(modelId, project, "0", "" + Long.MAX_VALUE); - val overlapSegments = segments.stream().filter(seg -> seg.getSegRange().overlaps(segmentRangeToBuild)) - .map(seg -> new SegmentRangeResponse(seg.getTSRange().getStart(), seg.getTSRange().getEnd())) + List<NDataSegment> overlapSegments = checkSegmentToBuildOverlapsBuilt(project, dataModelDesc, + segmentRangeToBuild, isBuildAllIndexes, batchIndexIds); + val overlapSegmentResponses = overlapSegments.stream().map( + segment -> new SegmentRangeResponse(segment.getTSRange().getStart(), segment.getTSRange().getEnd())) .collect(Collectors.toList()); - SegmentCheckResponse segmentCheckResponse = new SegmentCheckResponse(); val segHoles = segmentGaps.stream() .map(seg -> new SegmentRangeResponse(seg.getTSRange().getStart(), seg.getTSRange().getEnd())) .collect(Collectors.toList()); segmentCheckResponse.setSegmentHoles(segHoles); - segmentCheckResponse.setOverlapSegments(overlapSegments); + segmentCheckResponse.setOverlapSegments(overlapSegmentResponses); return segmentCheckResponse; } @@ -2563,17 +2561,36 @@ public class ModelService extends AbstractModelService implements TableModelSupp datamodelManager.updateDataModelDesc(modelUpdate); } - public void checkSegmentToBuildOverlapsBuilt(String project, String model, SegmentRange segmentRangeToBuild) { - Segments<NDataSegment> segments = getSegmentsByRange(model, project, "0", "" + Long.MAX_VALUE); - if (!CollectionUtils.isEmpty(segments)) { - for (NDataSegment existedSegment : segments) { - if (existedSegment.getSegRange().overlaps(segmentRangeToBuild)) { - throw new KylinException(SEGMENT_BUILD_RANGE_OVERLAP, - existedSegment.getSegRange().getStart().toString(), - existedSegment.getSegRange().getEnd().toString()); - } + public List<NDataSegment> checkSegmentToBuildOverlapsBuilt(String project, NDataModel model, + SegmentRange<Long> segmentRangeToBuild, boolean isBuildAllIndexes, List<Long> batchIndexIds) { + boolean isOverlap; + Segments<NDataSegment> segments = getSegmentsByRange(model.getId(), project, "0", "" + Long.MAX_VALUE); + List<NDataSegment> overlapsBuiltSegment = Lists.newArrayListWithCapacity(segments.size()); + + if (CollectionUtils.isEmpty(segments)) { + return overlapsBuiltSegment; + } + + boolean buildSegmentOverlapEnable = getIndexPlan(model.getId(), project).getConfig() + .isBuildSegmentOverlapEnabled(); + boolean isBuildAllIndexesFinally = batchIndexIds == null || batchIndexIds.size() == 0 + || batchIndexIds.size() == getIndexPlan(model.getId(), project).getAllIndexes().size(); + + for (NDataSegment existedSegment : segments) { + if (buildSegmentOverlapEnable && NDataModel.ModelType.BATCH == model.getModelType() + && !model.isMultiPartitionModel() && isBuildAllIndexes && isBuildAllIndexesFinally + && !SecondStorageUtil.isModelEnable(project, model.getId())) { + isOverlap = existedSegment.getSegRange().overlaps(segmentRangeToBuild) + && !segmentRangeToBuild.contains(existedSegment.getSegRange()); + } else { + isOverlap = existedSegment.getSegRange().overlaps(segmentRangeToBuild); + } + if (isOverlap) { + overlapsBuiltSegment.add(existedSegment); } } + + return overlapsBuiltSegment; } public ComputedColumnUsageResponse getComputedColumnUsages(String project) { diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java index e9dc0f22c3..8d3a4462b2 100644 --- a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java +++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java @@ -3889,19 +3889,93 @@ public class ModelServiceTest extends SourceTestCase { Assert.assertEquals(0, res.getOverlapSegments().size()); Assert.assertEquals(1, res.getSegmentHoles().size()); - res = modelService.checkSegHoleExistIfNewRangeBuild(getProject(), modelId, "20000", "30000"); + res = modelService.checkSegHoleExistIfNewRangeBuild(getProject(), modelId, "20000", "30000", true, null); Assert.assertEquals(0, res.getOverlapSegments().size()); Assert.assertEquals(3, res.getSegmentHoles().size()); - res = modelService.checkSegHoleExistIfNewRangeBuild(getProject(), modelId, "1", "10"); + res = modelService.checkSegHoleExistIfNewRangeBuild(getProject(), modelId, "1", "10", true, null); Assert.assertEquals(0, res.getOverlapSegments().size()); Assert.assertEquals(1, res.getSegmentHoles().size()); - res = modelService.checkSegHoleExistIfNewRangeBuild(getProject(), modelId, "1", "5"); + res = modelService.checkSegHoleExistIfNewRangeBuild(getProject(), modelId, "1", "5", true, null); Assert.assertEquals(0, res.getOverlapSegments().size()); Assert.assertEquals(2, res.getSegmentHoles().size()); } + @Test + public void testCheckSegmentToBuildOverlapsBuilt() throws IOException { + KylinConfig kylinConfig = getTestConfig(); + final String defaultProject = getProject(); + final String streamingProject = "streaming_test"; + NDataModelManager modelManager = NDataModelManager.getInstance(kylinConfig, defaultProject); + + kylinConfig.setProperty("kylin.build.segment-overlap-enabled", "true"); + + List<NDataSegment> overlapSegments = modelService.checkSegmentToBuildOverlapsBuilt(defaultProject, + modelManager.getDataModelDesc("b780e4e4-69af-449e-b09f-05c90dfa04b6"), + new SegmentRange.TimePartitionedSegmentRange(1604188800000L, 1604361600000L), true, null); + Assert.assertEquals(3, overlapSegments.size()); + + val streamingModelManager = NDataModelManager.getInstance(getTestConfig(), streamingProject); + List<NDataSegment> overlapSegments2 = modelService.checkSegmentToBuildOverlapsBuilt(streamingProject, + streamingModelManager.getDataModelDesc("e78a89dd-847f-4574-8afa-8768b4228b74"), + new SegmentRange.KafkaOffsetPartitionedSegmentRange(1613957110000L, 1613957130000L), true, null); + Assert.assertEquals(2, overlapSegments2.size()); + + String modelId = "abe3bf1a-c4bc-458d-8278-7ea8b00f5e96"; + NDataModel dataModelDesc = modelManager.getDataModelDesc(modelId); + List<NDataSegment> overlapSegments3 = modelService.checkSegmentToBuildOverlapsBuilt(defaultProject, + dataModelDesc, new SegmentRange.TimePartitionedSegmentRange(1309891513770L, 1509891513770L), true, + null); + Assert.assertEquals(0, overlapSegments3.size()); + + List<NDataSegment> overlapSegments4 = modelService.checkSegmentToBuildOverlapsBuilt(defaultProject, + dataModelDesc, new SegmentRange.TimePartitionedSegmentRange(1309891513770L, 1609891513770L), true, + null); + Assert.assertEquals(0, overlapSegments4.size()); + + List<NDataSegment> overlapSegments5 = modelService.checkSegmentToBuildOverlapsBuilt(defaultProject, + dataModelDesc, new SegmentRange.TimePartitionedSegmentRange(1309891513770L, 1609891513770L), false, + null); + Assert.assertEquals(1, overlapSegments5.size()); + + List<NDataSegment> overlapSegments6 = modelService.checkSegmentToBuildOverlapsBuilt(defaultProject, + dataModelDesc, new SegmentRange.TimePartitionedSegmentRange(1309891513770L, 1609891513770L), true, + Lists.newArrayList()); + Assert.assertEquals(0, overlapSegments6.size()); + + List<NDataSegment> overlapSegments7 = modelService.checkSegmentToBuildOverlapsBuilt(defaultProject, + dataModelDesc, new SegmentRange.TimePartitionedSegmentRange(1309891513770L, 1609891513770L), true, + Lists.newArrayList(10000L)); + Assert.assertEquals(1, overlapSegments7.size()); + + List<NDataSegment> overlapSegments8 = modelService.checkSegmentToBuildOverlapsBuilt(defaultProject, + dataModelDesc, new SegmentRange.TimePartitionedSegmentRange(1309891513780L, 1509891513760L), true, + null); + Assert.assertEquals(1, overlapSegments8.size()); + + MockSecondStorage.mock(defaultProject, new ArrayList<>(), this); + val indexPlanManager = NIndexPlanManager.getInstance(KylinConfig.getInstanceFromEnv(), defaultProject); + EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> { + indexPlanManager.updateIndexPlan(modelId, indexPlan -> { + indexPlan.createAndAddBaseIndex(indexPlan.getModel()); + }); + return null; + }, defaultProject); + SecondStorageUtil.initModelMetaData(defaultProject, modelId); + Assert.assertTrue(SecondStorageUtil.isModelEnable(defaultProject, modelId)); + List<NDataSegment> overlapSegments31 = modelService.checkSegmentToBuildOverlapsBuilt(defaultProject, + dataModelDesc, new SegmentRange.TimePartitionedSegmentRange(1309891513770L, 1509891513770L), true, + null); + Assert.assertEquals(1, overlapSegments31.size()); + + kylinConfig.setProperty("kylin.build.segment-overlap-enabled", "false"); + List<NDataSegment> overlapSegments9 = modelService.checkSegmentToBuildOverlapsBuilt(defaultProject, + dataModelDesc, new SegmentRange.TimePartitionedSegmentRange(1309891513770L, 1509891513770L), true, + null); + Assert.assertEquals(1, overlapSegments9.size()); + } + @Test public void testUpdateModelOwner() throws IOException { String project = "default"; diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterBuildResourceMerger.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterBuildResourceMerger.java index 1913f45ba2..117d381df8 100644 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterBuildResourceMerger.java +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterBuildResourceMerger.java @@ -25,16 +25,16 @@ import java.util.Set; import org.apache.commons.collections.CollectionUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.engine.spark.ExecutableUtils; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.JobTypeEnum; -import org.apache.kylin.metadata.model.SegmentStatusEnum; -import org.apache.kylin.engine.spark.ExecutableUtils; import org.apache.kylin.metadata.cube.model.NDataLayout; import org.apache.kylin.metadata.cube.model.NDataSegment; import org.apache.kylin.metadata.cube.model.NDataflow; import org.apache.kylin.metadata.cube.model.NDataflowManager; import org.apache.kylin.metadata.cube.model.NDataflowUpdate; import org.apache.kylin.metadata.cube.model.PartitionStatusEnum; +import org.apache.kylin.metadata.model.SegmentStatusEnum; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -91,6 +91,7 @@ public class AfterBuildResourceMerger extends SparkJobMetadataMerger { val dfUpdate = new NDataflowUpdate(flowName); val theSeg = remoteDataflow.getSegment(segmentId); + val toRemoveSegments = remoteDataflowManager.getToRemoveSegs(remoteDataflow, theSeg); if (theSeg.getModel().isMultiPartitionModel()) { final long lastBuildTime = System.currentTimeMillis(); @@ -107,6 +108,7 @@ public class AfterBuildResourceMerger extends SparkJobMetadataMerger { theSeg.setStatus(SegmentStatusEnum.READY); dfUpdate.setToUpdateSegs(theSeg); + dfUpdate.setToRemoveSegs(toRemoveSegments.toArray(new NDataSegment[toRemoveSegments.size()])); dfUpdate.setToAddOrUpdateLayouts(theSeg.getSegDetails().getLayouts().toArray(new NDataLayout[0])); localDataflowManager.updateDataflow(dfUpdate);