KYLIN-3175, update streaming segment's tsrange after merge
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e071c1cc Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e071c1cc Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e071c1cc Branch: refs/heads/sync Commit: e071c1ccd74b2ca5aaf6449d71e022d530b66f26 Parents: 93a42b3 Author: Cheng Wang <cheng.w...@kyligence.io> Authored: Wed Jan 17 15:55:16 2018 +0800 Committer: Li Yang <liy...@apache.org> Committed: Fri Jan 26 19:07:39 2018 +0800 ---------------------------------------------------------------------- .../java/org/apache/kylin/cube/CubeManager.java | 33 ++++++++++++-------- .../kylin/provision/BuildCubeWithStream.java | 2 +- 2 files changed, 21 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/e071c1cc/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index df1d95e..8bdb5aa 100755 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -276,17 +276,17 @@ public class CubeManager implements IRealizationProvider { return updateCube(update); } } - + public CubeInstance updateCubeSegStatus(CubeSegment seg, SegmentStatusEnum status) throws IOException { try (AutoLock lock = cubeMapLock.lockForWrite()) { CubeInstance cube = seg.getCubeInstance().latestCopyForWrite(); seg = cube.getSegmentById(seg.getUuid()); - + CubeUpdate update = new CubeUpdate(cube); seg.setStatus(status); update.setToUpdateSegs(seg); return updateCube(update); - } + } } private CubeInstance updateCubeWithRetry(CubeUpdate update, int retry) throws IOException { @@ -523,7 +523,7 @@ public class CubeManager implements IRealizationProvider { return segAssist.optimizeSegments(cube, cuboidsRecommend); } } - + public CubeSegment mergeSegments(CubeInstance cube, TSRange tsRange, SegmentRange segRange, boolean force) throws IOException { try (AutoLock lock = cubeMapLock.lockForWrite()) { @@ -547,9 +547,9 @@ public class CubeManager implements IRealizationProvider { CubeSegment... optimizedSegments) throws IOException { try (AutoLock lock = cubeMapLock.lockForWrite()) { segAssist.promoteCheckpointOptimizeSegments(cube, recommendCuboids, optimizedSegments); - } + } } - + public List<CubeSegment> calculateHoles(String cubeName) { return segAssist.calculateHoles(cubeName); } @@ -683,8 +683,13 @@ public class CubeManager implements IRealizationProvider { tsRange = null; Preconditions.checkArgument(segRange != null); } else { - if(tsRange == null) { - tsRange = new TSRange((Long)segRange.start.v, (Long)segRange.end.v); + /**In case of non-streaming segment, + * tsRange is the same as segRange, + * either could fulfill the merge job, + * so it needs to convert segRange to tsRange if tsRange is null. + **/ + if (tsRange == null) { + tsRange = new TSRange((Long) segRange.start.v, (Long) segRange.end.v); } segRange = null; } @@ -832,10 +837,11 @@ public class CubeManager implements IRealizationProvider { updateCube(update); } - public void promoteNewlyOptimizeSegments(CubeInstance cube, CubeSegment... optimizedSegments) throws IOException { + public void promoteNewlyOptimizeSegments(CubeInstance cube, CubeSegment... optimizedSegments) + throws IOException { CubeInstance cubeCopy = cube.latestCopyForWrite(); CubeSegment[] segCopy = cube.regetSegments(optimizedSegments); - + for (CubeSegment seg : segCopy) { seg.setStatus(SegmentStatusEnum.READY_PENDING); } @@ -849,12 +855,12 @@ public class CubeManager implements IRealizationProvider { CubeSegment... optimizedSegments) throws IOException { CubeInstance cubeCopy = cube.latestCopyForWrite(); CubeSegment[] optSegCopy = cubeCopy.regetSegments(optimizedSegments); - + if (cubeCopy.getSegments().size() != optSegCopy.length * 2) { throw new IllegalStateException("For cube " + cubeCopy + ", every READY segment should be optimized and all segments should be READY before optimizing"); } - + CubeSegment[] originalSegments = new CubeSegment[optSegCopy.length]; int i = 0; for (CubeSegment seg : optSegCopy) { @@ -865,7 +871,8 @@ public class CubeManager implements IRealizationProvider { "For cube " + cubeCopy + ", segment " + seg + " missing StorageLocationIdentifier"); if (StringUtils.isBlank(seg.getLastBuildJobID())) - throw new IllegalStateException("For cube " + cubeCopy + ", segment " + seg + " missing LastBuildJobID"); + throw new IllegalStateException( + "For cube " + cubeCopy + ", segment " + seg + " missing LastBuildJobID"); seg.setStatus(SegmentStatusEnum.READY); } http://git-wip-us.apache.org/repos/asf/kylin/blob/e071c1cc/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java index f7b8275..181e8b9 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java @@ -370,7 +370,7 @@ public class BuildCubeWithStream { } catch (Throwable e) { logger.error("error", e); exitCode = 1; - } finally{ + } finally { if (buildCubeWithStream != null) { buildCubeWithStream.after(); buildCubeWithStream.cleanup();