This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit a1591a7e33e356a13a9978081733c6a6f55ababf Author: sibingzhang <74443791+sibingzh...@users.noreply.github.com> AuthorDate: Fri Nov 18 11:27:37 2022 +0800 [DIRTY] Fix when loading data to second storage is completed first, the segment status is still LOADING, not ONLINE(TIERED STORAGE) --- .../org/apache/kylin/job/common/SegmentUtil.java | 11 ++++-- .../org/apache/kylin/job/common/SegmentsTest.java | 40 ++++++++++++---------- .../apache/kylin/rest/service/ModelService.java | 2 +- 3 files changed, 32 insertions(+), 21 deletions(-) diff --git a/src/core-job/src/main/java/org/apache/kylin/job/common/SegmentUtil.java b/src/core-job/src/main/java/org/apache/kylin/job/common/SegmentUtil.java index 0185cee866..f7dcde86ff 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/common/SegmentUtil.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/common/SegmentUtil.java @@ -70,7 +70,7 @@ public class SegmentUtil { return SegmentStatusEnumToDisplay.REFRESHING; } - if (CollectionUtils.isEmpty(overlapSegs) || anyIndexJobRunning(segment, executables)) { + if (CollectionUtils.isEmpty(overlapSegs) || anyIncSegmentJobRunning(segment)) { return SegmentStatusEnumToDisplay.LOADING; } @@ -104,11 +104,18 @@ public class SegmentUtil { protected static <T extends ISegment> boolean anyIndexJobRunning(T segment) { KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); NExecutableManager execManager = NExecutableManager.getInstance(kylinConfig, segment.getModel().getProject()); - val executables = execManager.listExecByJobTypeAndStatus(ExecutableState::isRunning, INDEX_BUILD, INC_BUILD, + val executables = execManager.listExecByJobTypeAndStatus(ExecutableState::isRunning, INDEX_BUILD, SUB_PARTITION_BUILD); return executables.stream().anyMatch(task -> task.getSegmentIds().contains(segment.getId())); } + protected static <T extends ISegment> boolean anyIncSegmentJobRunning(T segment) { + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + NExecutableManager execManager = NExecutableManager.getInstance(kylinConfig, segment.getModel().getProject()); + val executables = execManager.listExecByJobTypeAndStatus(ExecutableState::isRunning, INC_BUILD); + return executables.stream().anyMatch(task -> task.getSegmentIds().contains(segment.getId())); + } + protected static <T extends ISegment> boolean anyIndexJobRunning(T segment, List<AbstractExecutable> executables) { if (Objects.isNull(executables)) { return anyIndexJobRunning(segment); diff --git a/src/core-job/src/test/java/org/apache/kylin/job/common/SegmentsTest.java b/src/core-job/src/test/java/org/apache/kylin/job/common/SegmentsTest.java index 587e688e21..7c76c58197 100644 --- a/src/core-job/src/test/java/org/apache/kylin/job/common/SegmentsTest.java +++ b/src/core-job/src/test/java/org/apache/kylin/job/common/SegmentsTest.java @@ -19,9 +19,7 @@ package org.apache.kylin.job.common; import org.apache.kylin.common.util.RandomUtil; -import org.apache.kylin.job.execution.DefaultExecutable; import org.apache.kylin.junit.TimeZoneTestRunner; -import org.apache.kylin.metadata.cube.model.NBatchConstants; import org.apache.kylin.metadata.cube.model.NDataSegment; import org.apache.kylin.metadata.cube.model.NDataflow; import org.apache.kylin.metadata.model.SegmentRange; @@ -34,8 +32,6 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mockito; -import com.google.common.collect.Lists; - import lombok.val; @RunWith(TimeZoneTestRunner.class) @@ -64,38 +60,46 @@ public class SegmentsTest { } @Test - public void testGetSegmentStatusToDisplay_Loading() { - Segments segments = new Segments(); + public void testGetSegmentStatusToDisplay_Loading_Merging() { + Segments<NDataSegment> segments = new Segments<>(); val seg = NDataSegment.empty(); - seg.setId("1"); - seg.setSegmentRange(new SegmentRange.TimePartitionedSegmentRange(0L, 10L)); + seg.setId("0"); + seg.setSegmentRange(new SegmentRange.TimePartitionedSegmentRange(0L, 5L)); seg.setStatus(SegmentStatusEnum.READY); segments.add(seg); + val seg1 = NDataSegment.empty(); + seg.setId("1"); + seg1.setSegmentRange(new SegmentRange.TimePartitionedSegmentRange(5L, 10L)); + seg1.setStatus(SegmentStatusEnum.READY); + segments.add(seg1); + val seg2 = NDataSegment.empty(); seg2.setId("2"); seg2.setSegmentRange(new SegmentRange.TimePartitionedSegmentRange(0L, 15L)); seg2.setStatus(SegmentStatusEnum.NEW); + segments.add(seg2); - val job = new DefaultExecutable(); - job.setParam(NBatchConstants.P_SEGMENT_IDS, "2"); + Mockito.mockStatic(SegmentUtil.class); + Mockito.when(SegmentUtil.getSegmentStatusToDisplay(segments, seg2, null)).thenCallRealMethod(); + Mockito.when(SegmentUtil.anyIncSegmentJobRunning(seg2)).thenReturn(true); + Mockito.when(SegmentUtil.anyIndexJobRunning(seg2)).thenReturn(false); - SegmentStatusEnumToDisplay status = SegmentUtil.getSegmentStatusToDisplay(segments, seg2, - Lists.newArrayList(job)); + SegmentStatusEnumToDisplay status = SegmentUtil.getSegmentStatusToDisplay(segments, seg2, null); Assert.assertEquals(SegmentStatusEnumToDisplay.LOADING, status); val seg3 = NDataSegment.empty(); seg3.setId("3"); - seg3.setSegmentRange(new SegmentRange.TimePartitionedSegmentRange(11L, 12L)); + seg3.setSegmentRange(new SegmentRange.TimePartitionedSegmentRange(0L, 10L)); seg3.setStatus(SegmentStatusEnum.NEW); segments.add(seg3); - val job2 = new DefaultExecutable(); - job2.setParam(NBatchConstants.P_SEGMENT_IDS, "3"); + Mockito.when(SegmentUtil.getSegmentStatusToDisplay(segments, seg3, null)).thenCallRealMethod(); + Mockito.when(SegmentUtil.anyIncSegmentJobRunning(seg3)).thenReturn(false); + Mockito.when(SegmentUtil.anyIndexJobRunning(seg3)).thenReturn(false); - SegmentStatusEnumToDisplay status2 = SegmentUtil.getSegmentStatusToDisplay(segments, seg3, - Lists.newArrayList(job2)); - Assert.assertEquals(SegmentStatusEnumToDisplay.LOADING, status2); + SegmentStatusEnumToDisplay status2 = SegmentUtil.getSegmentStatusToDisplay(segments, seg3, null); + Assert.assertEquals(SegmentStatusEnumToDisplay.MERGING, status2); } @Test diff --git a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java index 8a0beec63b..d83c15c751 100644 --- a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java +++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java @@ -929,7 +929,7 @@ public class ModelService extends AbstractModelService implements TableModelSupp KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); NExecutableManager execManager = NExecutableManager.getInstance(kylinConfig, project); return execManager.listPartialExec(path -> StringUtils.endsWith(path, modelId), ExecutableState::isRunning, - INDEX_BUILD, INC_BUILD, JobTypeEnum.SUB_PARTITION_BUILD); + INDEX_BUILD, JobTypeEnum.SUB_PARTITION_BUILD); } public List<NDataSegmentResponse> getSegmentsResponse(String modelId, String project, String start, String end,