KYLIN-3177, fix tsRange null issue
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/27ccf441 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/27ccf441 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/27ccf441 Branch: refs/heads/KYLIN-2881-review Commit: 27ccf441c64ca28a0dfe86ced6ea8b56b88b02df Parents: c3d1745 Author: Cheng Wang <cheng.w...@kyligence.io> Authored: Thu Jan 18 17:33:35 2018 +0800 Committer: Cheng Wang <cheng.w...@kyligence.io> Committed: Thu Jan 18 17:40:02 2018 +0800 ---------------------------------------------------------------------- .../mr/steps/UpdateCubeInfoAfterMergeStep.java | 14 +++++++++++--- .../apache/kylin/provision/BuildCubeWithStream.java | 15 +++++++++++---- 2 files changed, 22 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/27ccf441/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java index 3185bec..018abab 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java @@ -31,11 +31,10 @@ import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; import org.apache.kylin.job.execution.ExecuteResult; +import org.apache.kylin.metadata.model.SegmentRange; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - */ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable { private static final Logger logger = LoggerFactory.getLogger(UpdateCubeInfoAfterMergeStep.class); @@ -66,10 +65,15 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable { } long sourceCount = 0L; long sourceSize = 0L; + + boolean isOffsetCube = mergedSegment.isOffsetCube(); + Long tsStartMin = Long.MAX_VALUE, tsEndMax = 0L; for (String id : mergingSegmentIds) { CubeSegment segment = cube.getSegmentById(id); sourceCount += segment.getInputRecords(); sourceSize += segment.getInputRecordsSize(); + tsStartMin = Math.min(tsStartMin, segment.getTSRange().start.v); + tsEndMax = Math.max(tsEndMax, segment.getTSRange().end.v); } // update segment info @@ -79,6 +83,11 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable { mergedSegment.setLastBuildJobID(CubingExecutableUtil.getCubingJobId(this.getParams())); mergedSegment.setLastBuildTime(System.currentTimeMillis()); + if (isOffsetCube == true) { + SegmentRange.TSRange tsRange = new SegmentRange.TSRange(tsStartMin, tsEndMax); + mergedSegment.setTSRange(tsRange); + } + try { cubeManager.promoteNewlyBuiltSegments(cube, mergedSegment); return new ExecuteResult(ExecuteResult.State.SUCCEED); @@ -87,5 +96,4 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable { return ExecuteResult.createError(e); } } - } http://git-wip-us.apache.org/repos/asf/kylin/blob/27ccf441/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java index f9277bc..f7b8275 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java @@ -232,7 +232,8 @@ public class BuildCubeWithStream { for (int i = 0; i < futures.size(); i++) { ExecutableState result = futures.get(i).get(20, TimeUnit.MINUTES); logger.info("Checking building task " + i + " whose state is " + result); - Assert.assertTrue(result == null || result == ExecutableState.SUCCEED || result == ExecutableState.DISCARDED); + Assert.assertTrue( + result == null || result == ExecutableState.SUCCEED || result == ExecutableState.DISCARDED); if (result == ExecutableState.SUCCEED) succeedBuild++; } @@ -250,6 +251,9 @@ public class BuildCubeWithStream { segments = cubeManager.getCube(cubeName).getSegments(); Assert.assertTrue(segments.size() == 1); + SegmentRange.TSRange tsRange = segments.get(0).getTSRange(); + Assert.assertTrue(tsRange.duration() > 0); + CubeSegment toRefreshSeg = segments.get(0); refreshSegment(cubeName, toRefreshSeg.getSegRange()); @@ -279,7 +283,8 @@ public class BuildCubeWithStream { protected ExecutableState buildSegment(String cubeName, long startOffset, long endOffset) throws Exception { CubeInstance cubeInstance = cubeManager.getCube(cubeName); ISource source = SourceFactory.getSource(cubeInstance); - SourcePartition partition = source.enrichSourcePartitionBeforeBuild(cubeInstance, new SourcePartition(null, new SegmentRange(startOffset, endOffset), null, null)); + SourcePartition partition = source.enrichSourcePartitionBeforeBuild(cubeInstance, + new SourcePartition(null, new SegmentRange(startOffset, endOffset), null, null)); CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), partition); DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST"); jobService.addJob(job); @@ -298,7 +303,8 @@ public class BuildCubeWithStream { ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); System.setProperty(KylinConfig.KYLIN_CONF, HBaseMetadataTestCase.SANDBOX_TEST_DATA); if (StringUtils.isEmpty(System.getProperty("hdp.version"))) { - throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.4.0.0-169"); + throw new RuntimeException( + "No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.4.0.0-169"); } HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA); } @@ -326,7 +332,8 @@ public class BuildCubeWithStream { protected void waitForJob(String jobId) { while (true) { AbstractExecutable job = jobService.getJob(jobId); - if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR || job.getStatus() == ExecutableState.DISCARDED) { + if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR + || job.getStatus() == ExecutableState.DISCARDED) { break; } else { try {