This is an automated email from the ASF dual-hosted git repository. magang pushed a commit to branch realtime-streaming in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/realtime-streaming by this push: new 67101ce KYLIN-3791 Map return by Maps.transformValues is a immutable view 67101ce is described below commit 67101ce6c3f3ab8dcad855cd98e6c97deda063f0 Author: hit-lacus <hit_la...@126.com> AuthorDate: Sat Jan 26 21:47:37 2019 +0800 KYLIN-3791 Map return by Maps.transformValues is a immutable view --- .../stream/core/storage/StreamingSegmentManager.java | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java index 28a294d..38c53fe 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java @@ -62,12 +62,18 @@ public class StreamingSegmentManager implements Closeable { private final String cubeName; private final CubeInstance cubeInstance; - //Cube window defines how streaming events are divided and put into different segments , for example 1 hour per segment(indexer). - //If the received events' timestamp is completely out of order and belongs to a very wide range, there will be multiple active segment indexers created and serve the indexing and querying. + /** + * Cube window defines how streaming events are divided and put into different segments , for example 1 hour per segment(indexer). + * If the received events' timestamp is completely out of order and belongs to a very wide range, + * there will be multiple active segment indexers created and serve the indexing and querying. + * */ private final long cubeWindow; - //Cube duration defines how long the oldest streaming segment becomes immutable and does not allow additional modification. - //Any further long latency events that can't find a corresponding segment to serve the index, the events will be put to a specific segment for long latency events only. + /** + * Cube duration defines how long the oldest streaming segment becomes immutable and does not allow additional modification. + * Any further long latency events that can't find a corresponding segment to serve the index, + * the events will be put to a specific segment for long latency events only. + * */ private final long cubeDuration; private final long maxCubeDuration; @@ -223,13 +229,13 @@ public class StreamingSegmentManager implements Closeable { private void restoreSegmentsFromCP(List<File> segmentFolders, Map<Long, String> checkpointStoreStats, Map<Long, String> segmentSourceStartPositions, CubeSegment latestRemoteSegment) { if (segmentSourceStartPositions != null) { - this.segmentSourceStartPositions = Maps.transformValues(segmentSourceStartPositions, new Function<String, ISourcePosition>() { + this.segmentSourceStartPositions.putAll(Maps.transformValues(segmentSourceStartPositions, new Function<String, ISourcePosition>() { @Nullable @Override public ISourcePosition apply(@Nullable String input) { return sourcePositionHandler.parsePosition(input); } - }); + })); } for (File segmentFolder : segmentFolders) { try {