always save cube segment first, even for streaming case. Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/2008fb05 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/2008fb05 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/2008fb05
Branch: refs/heads/stream_m1 Commit: 2008fb0584790fbf5c8d6bb75316426b32197164 Parents: 5f2db8c Author: shaofengshi <shaofeng...@apache.org> Authored: Fri Jun 17 16:42:36 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Fri Jun 17 16:42:36 2016 +0800 ---------------------------------------------------------------------- .../main/java/org/apache/kylin/cube/CubeManager.java | 12 +++++------- .../engine/streaming/cube/StreamingCubeBuilder.java | 4 ++-- 2 files changed, 7 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/2008fb05/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index f2d3d09..a33cf00 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -408,10 +408,10 @@ public class CubeManager implements IRealizationProvider { } public CubeSegment appendSegment(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset) throws IOException { - return appendSegment(cube, startDate, endDate, startOffset, endOffset, true, true); + return appendSegment(cube, startDate, endDate, startOffset, endOffset, true); } - public CubeSegment appendSegment(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset, boolean strictChecking, boolean saveChange) throws IOException { + public CubeSegment appendSegment(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset, boolean strictChecking) throws IOException { if(strictChecking) checkNoBuildingSegment(cube); @@ -435,11 +435,9 @@ public class CubeManager implements IRealizationProvider { CubeSegment newSegment = newSegment(cube, startDate, endDate, startOffset, endOffset); validateNewSegments(cube, newSegment); - if (saveChange) { - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToAddSegs(newSegment); - updateCube(cubeBuilder); - } + CubeUpdate cubeBuilder = new CubeUpdate(cube); + cubeBuilder.setToAddSegs(newSegment); + updateCube(cubeBuilder); return newSegment; } http://git-wip-us.apache.org/repos/asf/kylin/blob/2008fb05/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java index 3296b24..59bdc2d 100644 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java +++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java @@ -115,7 +115,7 @@ public class StreamingCubeBuilder implements StreamingBatchBuilder { CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName); try { - CubeSegment segment = cubeManager.appendSegment(cubeInstance, streamingBatch.getTimeRange().getFirst(), streamingBatch.getTimeRange().getSecond(), 0, 0, false, false); + CubeSegment segment = cubeManager.appendSegment(cubeInstance, streamingBatch.getTimeRange().getFirst(), streamingBatch.getTimeRange().getSecond(), 0, 0, false); segment.setLastBuildJobID(segment.getUuid()); // give a fake job id segment.setInputRecords(streamingBatch.getMessages().size()); segment.setLastBuildTime(System.currentTimeMillis()); @@ -172,7 +172,7 @@ public class StreamingCubeBuilder implements StreamingBatchBuilder { cubeSegment.setStatus(SegmentStatusEnum.READY); cubeSegment.setInputRecords(processedRowCount); CubeUpdate cubeBuilder = new CubeUpdate(cubeSegment.getCubeInstance()); - cubeBuilder.setToAddSegs(cubeSegment); + cubeBuilder.setToUpdateSegs(cubeSegment); try { CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).updateCube(cubeBuilder); } catch (IOException e) {