This is an automated email from the ASF dual-hosted git repository.

magang pushed a commit to branch realtime-streaming
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/realtime-streaming by this 
push:
     new 67101ce  KYLIN-3791 Map return by Maps.transformValues is a immutable 
view
67101ce is described below

commit 67101ce6c3f3ab8dcad855cd98e6c97deda063f0
Author: hit-lacus <hit_la...@126.com>
AuthorDate: Sat Jan 26 21:47:37 2019 +0800

    KYLIN-3791 Map return by Maps.transformValues is a immutable view
---
 .../stream/core/storage/StreamingSegmentManager.java   | 18 ++++++++++++------
 1 file changed, 12 insertions(+), 6 deletions(-)

diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java
index 28a294d..38c53fe 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java
@@ -62,12 +62,18 @@ public class StreamingSegmentManager implements Closeable {
     private final String cubeName;
     private final CubeInstance cubeInstance;
 
-    //Cube window defines how streaming events are divided and put into 
different segments , for example 1 hour per segment(indexer).
-    //If the received events' timestamp is completely out of order and belongs 
to a very wide range, there will be multiple active segment indexers created 
and serve the indexing and querying.
+    /**
+     * Cube window defines how streaming events are divided and put into 
different segments , for example 1 hour per segment(indexer).
+     * If the received events' timestamp is completely out of order and 
belongs to a very wide range,
+     * there will be multiple active segment indexers created and serve the 
indexing and querying.
+     * */
     private final long cubeWindow;
 
-    //Cube duration defines how long the oldest streaming segment becomes 
immutable and does not allow additional modification.
-    //Any further long latency events that can't find a corresponding segment 
to serve the index, the events will be put to a specific segment for long 
latency events only.
+    /**
+     * Cube duration defines how long the oldest streaming segment becomes 
immutable and does not allow additional modification.
+     * Any further long latency events that can't find a corresponding segment 
to serve the index,
+     * the events will be put to a specific segment for long latency events 
only.
+     * */
     private final long cubeDuration;
 
     private final long maxCubeDuration;
@@ -223,13 +229,13 @@ public class StreamingSegmentManager implements Closeable 
{
     private void restoreSegmentsFromCP(List<File> segmentFolders, Map<Long, 
String> checkpointStoreStats,
                                        Map<Long, String> 
segmentSourceStartPositions, CubeSegment latestRemoteSegment) {
         if (segmentSourceStartPositions != null) {
-            this.segmentSourceStartPositions = 
Maps.transformValues(segmentSourceStartPositions, new Function<String, 
ISourcePosition>() {
+            
this.segmentSourceStartPositions.putAll(Maps.transformValues(segmentSourceStartPositions,
 new Function<String, ISourcePosition>() {
                 @Nullable
                 @Override
                 public ISourcePosition apply(@Nullable String input) {
                     return sourcePositionHandler.parsePosition(input);
                 }
-            });
+            }));
         }
         for (File segmentFolder : segmentFolders) {
             try {

Reply via email to