This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b936eb  minor, fix sonar reported bugs (#1368)
0b936eb is described below

commit 0b936eb16c7fb29d337f0e59759231030256ac28
Author: Shaofeng Shi <shaofeng...@apache.org>
AuthorDate: Sun Aug 23 16:32:45 2020 +0800

    minor, fix sonar reported bugs (#1368)
    
    * minor, fix sonar reported bugs
    
    * minor, fix sonar
    
    * minor, code format
---
 .../java/org/apache/kylin/common/util/Bytes.java   |  2 +-
 .../cube/inmemcubing/AbstractInMemCubeBuilder.java |  8 +--
 .../cube/inmemcubing/ICuboidGTTableWriter.java     |  8 +--
 .../kylin/cube/inmemcubing/InMemCubeBuilder.java   | 11 +--
 .../gridtable/GTAggregateTransformScanner.java     | 37 ++++++++--
 .../gridtable/GTTwoLayerAggregateScanner.java      |  2 +
 .../kylin/cube/inmemcubing/MemDiskStoreTest.java   | 11 +--
 .../kylin/gridtable/SimpleGridTableTest.java       | 84 +++++++++++-----------
 .../apache/kylin/dict/GlobalDictionaryBuilder.java | 11 +--
 .../kylin/metadata/streaming/StreamingConfig.java  |  2 +-
 .../kylin/storage/gtrecord/DictGridTableTest.java  | 26 +++----
 .../engine/mr/common/MapReduceExecutable.java      | 14 ++--
 .../mr/streaming/ColumnarSplitDataReader.java      |  2 +-
 .../kylin/engine/mr/streaming/DictsReader.java     | 44 +++++-------
 .../kylin/engine/mr/streaming/RowRecordReader.java | 41 +++++------
 .../kylin/realtime/BuildCubeWithStreamV2.java      |  2 +-
 .../apache/kylin/query/relnode/OLAPContext.java    | 12 ++--
 .../coprocessor/endpoint/CubeVisitServiceTest.java | 25 +++----
 .../columnar/ColumnarMemoryStorePersister.java     | 12 ++--
 .../storage/columnar/ColumnarStoreDimDesc.java     | 13 ++--
 .../storage/columnar/ColumnarStoreMetricsDesc.java | 13 ++--
 .../columnar/FSInputGeneralColumnDataReader.java   |  6 +-
 .../storage/columnar/FragmentCuboidReader.java     | 12 ++--
 .../core/storage/columnar/SegmentMemoryStore.java  | 29 ++++----
 .../compress/FSInputLZ4CompressedColumnReader.java | 13 ++--
 .../compress/FSInputNoCompressedColumnReader.java  | 18 ++---
 .../compress/FSInputRLECompressedColumnReader.java | 12 +++-
 .../compress/LZ4CompressedColumnReader.java        |  8 +--
 .../compress/NoCompressedColumnReader.java         |  4 ++
 .../compress/RunLengthCompressedColumnReader.java  |  5 ++
 .../kylin/stream/core/util/RecordsSerializer.java  |  1 +
 .../columnar/compress/LZ4CompressColumnTest.java   |  4 +-
 .../columnar/compress/NoCompressColumnTest.java    |  5 +-
 .../compress/RunLengthCompressColumnTest.java      |  4 +-
 .../kylin/tool/query/ProbabilityGenerator.java     |  4 ++
 35 files changed, 273 insertions(+), 232 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/util/Bytes.java 
b/core-common/src/main/java/org/apache/kylin/common/util/Bytes.java
index 65f6fef..2f13fba 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/Bytes.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/Bytes.java
@@ -71,7 +71,7 @@ public class Bytes {
     /**
      * Size of boolean in bytes
      */
-    public static final int SIZEOF_BOOLEAN = Byte.SIZE / Byte.SIZE;
+    public static final int SIZEOF_BOOLEAN = 1;
 
     /**
      * Size of byte in bytes
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
index df1fa7a..89c2cd8 100644
--- 
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
@@ -101,11 +101,11 @@ abstract public class AbstractInMemCubeBuilder {
     protected void outputCuboid(long cuboidId, GridTable gridTable, 
ICuboidWriter output) throws IOException {
         long startTime = System.currentTimeMillis();
         GTScanRequest req = new 
GTScanRequestBuilder().setInfo(gridTable.getInfo()).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest();
-        IGTScanner scanner = gridTable.scan(req);
-        for (GTRecord record : scanner) {
-            output.write(cuboidId, record);
+        try (IGTScanner scanner = gridTable.scan(req)) {
+            for (GTRecord record : scanner) {
+                output.write(cuboidId, record);
+            }
         }
-        scanner.close();
         logger.debug("Cuboid " + cuboidId + " output takes " + 
(System.currentTimeMillis() - startTime) + "ms");
     }
 
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidGTTableWriter.java
 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidGTTableWriter.java
index 93a7994..5c32d6c 100755
--- 
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidGTTableWriter.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidGTTableWriter.java
@@ -37,11 +37,11 @@ public abstract class ICuboidGTTableWriter implements 
ICuboidWriter{
     public void write(long cuboidId, GridTable gridTable) throws IOException {
         long startTime = System.currentTimeMillis();
         GTScanRequest req = new 
GTScanRequestBuilder().setInfo(gridTable.getInfo()).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest();
-        IGTScanner scanner = gridTable.scan(req);
-        for (GTRecord record : scanner) {
-            write(cuboidId, record);
+        try (IGTScanner scanner = gridTable.scan(req)) {
+            for (GTRecord record : scanner) {
+                write(cuboidId, record);
+            }
         }
-        scanner.close();
         logger.info("Cuboid " + cuboidId + " output takes " + 
(System.currentTimeMillis() - startTime) + "ms");
     }
 }
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index 403ec27..eabad14 100644
--- 
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -445,15 +445,14 @@ public class InMemCubeBuilder extends 
AbstractInMemCubeBuilder {
         long startTime = System.currentTimeMillis();
         logger.info("Calculating cuboid {}", cuboidId);
 
-        GTAggregateScanner scanner = prepareGTAggregationScanner(gridTable, 
parentId, cuboidId, aggregationColumns, measureColumns);
-        GridTable newGridTable = newGridTableByCuboidID(cuboidId);
-        GTBuilder builder = newGridTable.rebuild();
 
+        GridTable newGridTable = newGridTableByCuboidID(cuboidId);
         ImmutableBitSet allNeededColumns = 
aggregationColumns.or(measureColumns);
 
         GTRecord newRecord = new GTRecord(newGridTable.getInfo());
         int count = 0;
-        try {
+        try (GTAggregateScanner scanner = 
prepareGTAggregationScanner(gridTable, parentId, cuboidId, aggregationColumns, 
measureColumns);
+             GTBuilder builder = newGridTable.rebuild()) {
             for (GTRecord record : scanner) {
                 count++;
                 for (int i = 0; i < allNeededColumns.trueBitCount(); i++) {
@@ -462,10 +461,6 @@ public class InMemCubeBuilder extends 
AbstractInMemCubeBuilder {
                 }
                 builder.write(newRecord);
             }
-
-        } finally {
-            scanner.close();
-            builder.close();
         }
 
         long timeSpent = System.currentTimeMillis() - startTime;
diff --git 
a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateTransformScanner.java
 
b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateTransformScanner.java
index 0f805e8..c59ae36 100644
--- 
a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateTransformScanner.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateTransformScanner.java
@@ -38,6 +38,7 @@ public class GTAggregateTransformScanner implements 
IGTScanner {
     private final GTScanRequest req;
     private final GTTwoLayerAggregateParam twoLayerAggParam;
     private long inputRowCount = 0L;
+    private Iterator<GTRecord> iterator = null;
 
     public GTAggregateTransformScanner(IGTScanner inputScanner, GTScanRequest 
req) {
         this.inputScanner = inputScanner;
@@ -57,9 +58,13 @@ public class GTAggregateTransformScanner implements 
IGTScanner {
 
     @Override
     public Iterator<GTRecord> iterator() {
-        return twoLayerAggParam.satisfyPrefix(req.getDimensions())
-                ? new 
FragmentTransformGTRecordIterator(inputScanner.iterator())
-                : new NormalTransformGTRecordIterator(inputScanner.iterator());
+        if (iterator == null) {
+            iterator = twoLayerAggParam.satisfyPrefix(req.getDimensions())
+                    ? new 
FragmentTransformGTRecordIterator(inputScanner.iterator())
+                    : new 
NormalTransformGTRecordIterator(inputScanner.iterator());
+        }
+
+        return iterator;
     }
 
     public long getInputRowCount() {
@@ -75,6 +80,7 @@ public class GTAggregateTransformScanner implements 
IGTScanner {
     private class FragmentTransformGTRecordIterator extends 
TransformGTRecordIterator {
         private final PrefixFragmentIterator fragmentIterator;
 
+        private GTAggregateScanner aggregateScanner;
         private Iterator<GTRecord> transformedFragment = null;
 
         FragmentTransformGTRecordIterator(Iterator<GTRecord> input) {
@@ -88,6 +94,12 @@ public class GTAggregateTransformScanner implements 
IGTScanner {
             }
 
             if (!fragmentIterator.hasNext()) {
+                // close resource
+                try {
+                    aggregateScanner.close();
+                } catch (IOException e) {
+                    // do nothing
+                }
                 return false;
             }
 
@@ -106,7 +118,8 @@ public class GTAggregateTransformScanner implements 
IGTScanner {
                     return fragmentIterator.next();
                 }
             };
-            transformedFragment = new GTAggregateScanner(fragmentScanner, 
innerReq).iterator();
+            aggregateScanner = new GTAggregateScanner(fragmentScanner, 
innerReq);
+            transformedFragment = aggregateScanner.iterator();
 
             return hasNext();
         }
@@ -124,6 +137,7 @@ public class GTAggregateTransformScanner implements 
IGTScanner {
     private class NormalTransformGTRecordIterator extends 
TransformGTRecordIterator {
 
         private final Iterator<GTRecord> aggRecordIterator;
+        private final GTAggregateScanner aggregateScanner;
 
         NormalTransformGTRecordIterator(final Iterator<GTRecord> input) {
             IGTScanner gtScanner = new IGTScanner() {
@@ -134,6 +148,7 @@ public class GTAggregateTransformScanner implements 
IGTScanner {
 
                 @Override
                 public void close() throws IOException {
+                    // do nothing
                 }
 
                 @Override
@@ -141,13 +156,21 @@ public class GTAggregateTransformScanner implements 
IGTScanner {
                     return input;
                 }
             };
-
-            aggRecordIterator = new GTAggregateScanner(gtScanner, 
innerReq).iterator();
+            aggregateScanner = new GTAggregateScanner(gtScanner, innerReq);
+            aggRecordIterator = aggregateScanner.iterator();
         }
 
         @Override
         public boolean hasNext() {
-            return aggRecordIterator.hasNext();
+            boolean hasNext = aggRecordIterator.hasNext();
+            if (!hasNext) {
+                try {
+                    aggregateScanner.close();
+                } catch (IOException e) {
+                    // do nothing
+                }
+            }
+            return hasNext;
         }
 
         @Override
diff --git 
a/core-cube/src/main/java/org/apache/kylin/gridtable/GTTwoLayerAggregateScanner.java
 
b/core-cube/src/main/java/org/apache/kylin/gridtable/GTTwoLayerAggregateScanner.java
index 5df96e2..58b5c1e 100644
--- 
a/core-cube/src/main/java/org/apache/kylin/gridtable/GTTwoLayerAggregateScanner.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/gridtable/GTTwoLayerAggregateScanner.java
@@ -46,6 +46,8 @@ public class GTTwoLayerAggregateScanner implements IGTScanner 
{
     @Override
     public void close() throws IOException {
         inputScanner.close();
+        secondLayerInputScanner.close();
+        outputScanner.close();
     }
 
     @Override
diff --git 
a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java
 
b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java
index f457ea6..bd147bf 100644
--- 
a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java
+++ 
b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java
@@ -102,11 +102,12 @@ public class MemDiskStoreTest extends 
LocalFileMetadataTestCase {
         }
         builder.close();
 
-        IGTScanner scanner = table.scan(new 
GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest());
-        int i = 0;
-        for (GTRecord r : scanner) {
-            assertEquals(data.get(i++), r);
+        try (IGTScanner scanner = table.scan(new 
GTScanRequestBuilder().setInfo(info).setRanges(null)
+                
.setDimensions(null).setFilterPushDown(null).createGTScanRequest())) {
+            int i = 0;
+            for (GTRecord r : scanner) {
+                assertEquals(data.get(i++), r);
+            }
         }
-        scanner.close();
     }
 }
diff --git 
a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java 
b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
index 14a25c5..6f3de1c 100644
--- 
a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
+++ 
b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
@@ -51,7 +51,7 @@ public class SimpleGridTableTest extends 
LocalFileMetadataTestCase {
         GridTable table = new GridTable(info, store);
 
         GTBuilder builder = rebuild(table);
-        IGTScanner scanner = scan(table);
+        scan(table);
     }
 
     @Test
@@ -61,7 +61,7 @@ public class SimpleGridTableTest extends 
LocalFileMetadataTestCase {
         GridTable table = new GridTable(info, store);
 
         GTBuilder builder = rebuild(table);
-        IGTScanner scanner = scan(table);
+        scan(table);
     }
 
     @Test
@@ -71,7 +71,7 @@ public class SimpleGridTableTest extends 
LocalFileMetadataTestCase {
         GridTable table = new GridTable(info, store);
 
         GTBuilder builder = rebuild(table);
-        IGTScanner scanner = scanAndAggregate(table);
+        scanAndAggregate(table);
     }
 
     @Test
@@ -81,55 +81,53 @@ public class SimpleGridTableTest extends 
LocalFileMetadataTestCase {
         GridTable table = new GridTable(info, store);
 
         rebuildViaAppend(table);
-        IGTScanner scanner = scan(table);
+        scan(table);
     }
 
-    private IGTScanner scan(GridTable table) throws IOException {
+    private void scan(GridTable table) throws IOException {
         GTScanRequest req = new 
GTScanRequestBuilder().setInfo(table.getInfo()).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest();
-        IGTScanner scanner = table.scan(req);
-        for (GTRecord r : scanner) {
-            Object[] v = r.getValues();
-            assertTrue(((String) v[0]).startsWith("2015-"));
-            assertTrue(((String) v[2]).equals("Food"));
-            assertTrue(((Long) v[3]).longValue() == 10);
-            assertTrue(((BigDecimal) v[4]).doubleValue() == 10.5);
-            System.out.println(r);
+        try (IGTScanner scanner = table.scan(req)) {
+            for (GTRecord r : scanner) {
+                Object[] v = r.getValues();
+                assertTrue(((String) v[0]).startsWith("2015-"));
+                assertTrue(((String) v[2]).equals("Food"));
+                assertTrue(((Long) v[3]).longValue() == 10);
+                assertTrue(((BigDecimal) v[4]).doubleValue() == 10.5);
+                System.out.println(r);
+            }
         }
-        scanner.close();
-        return scanner;
     }
 
-    private IGTScanner scanAndAggregate(GridTable table) throws IOException {
+    private void scanAndAggregate(GridTable table) throws IOException {
         GTScanRequest req = new 
GTScanRequestBuilder().setInfo(table.getInfo()).setRanges(null).setDimensions(null).setAggrGroupBy(setOf(0,
 2)).setAggrMetrics(setOf(3, 4)).setAggrMetricsFuncs(new String[] { "count", 
"sum" }).setFilterPushDown(null).createGTScanRequest();
-        IGTScanner scanner = table.scan(req);
-        int i = 0;
-        for (GTRecord r : scanner) {
-            Object[] v = r.getValues();
-            switch (i) {
-            case 0:
-                assertTrue(((Long) v[3]).longValue() == 20);
-                assertTrue(((BigDecimal) v[4]).doubleValue() == 21.0);
-                break;
-            case 1:
-                assertTrue(((Long) v[3]).longValue() == 30);
-                assertTrue(((BigDecimal) v[4]).doubleValue() == 31.5);
-                break;
-            case 2:
-                assertTrue(((Long) v[3]).longValue() == 40);
-                assertTrue(((BigDecimal) v[4]).doubleValue() == 42.0);
-                break;
-            case 3:
-                assertTrue(((Long) v[3]).longValue() == 10);
-                assertTrue(((BigDecimal) v[4]).doubleValue() == 10.5);
-                break;
-            default:
-                fail();
+        try (IGTScanner scanner = table.scan(req)) {
+            int i = 0;
+            for (GTRecord r : scanner) {
+                Object[] v = r.getValues();
+                switch (i) {
+                    case 0:
+                        assertTrue(((Long) v[3]).longValue() == 20);
+                        assertTrue(((BigDecimal) v[4]).doubleValue() == 21.0);
+                        break;
+                    case 1:
+                        assertTrue(((Long) v[3]).longValue() == 30);
+                        assertTrue(((BigDecimal) v[4]).doubleValue() == 31.5);
+                        break;
+                    case 2:
+                        assertTrue(((Long) v[3]).longValue() == 40);
+                        assertTrue(((BigDecimal) v[4]).doubleValue() == 42.0);
+                        break;
+                    case 3:
+                        assertTrue(((Long) v[3]).longValue() == 10);
+                        assertTrue(((BigDecimal) v[4]).doubleValue() == 10.5);
+                        break;
+                    default:
+                        fail();
+                }
+                i++;
+                System.out.println(r);
             }
-            i++;
-            System.out.println(r);
         }
-        scanner.close();
-        return scanner;
     }
 
     static GTBuilder rebuild(GridTable table) throws IOException {
diff --git 
a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
 
b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
index d813793..ba31e2b 100644
--- 
a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
+++ 
b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
@@ -46,9 +46,6 @@ public class GlobalDictionaryBuilder implements 
IDictionaryBuilder {
     @Override
     public void init(DictionaryInfo dictInfo, int baseId, String hdfsDir) 
throws IOException {
         sourceColumn = dictInfo.getSourceTable() + "_" + 
dictInfo.getSourceColumn();
-        lock = 
KylinConfig.getInstanceFromEnv().getDistributedLockFactory().lockForCurrentThread();
-        lock.lock(getLockPath(sourceColumn), Long.MAX_VALUE);
-
         int maxEntriesPerSlice = 
KylinConfig.getInstanceFromEnv().getAppendDictEntrySize();
         if (hdfsDir == null) {
             //build in Kylin job server
@@ -56,12 +53,18 @@ public class GlobalDictionaryBuilder implements 
IDictionaryBuilder {
         }
         String baseDir = hdfsDir + "resources/GlobalDict" + 
dictInfo.getResourceDir() + "/";
 
+        lock = 
KylinConfig.getInstanceFromEnv().getDistributedLockFactory().lockForCurrentThread();
+        String lockPath = getLockPath(sourceColumn);
         try {
+            lock.lock(lockPath, Long.MAX_VALUE);
             this.builder = new AppendTrieDictionaryBuilder(baseDir, 
maxEntriesPerSlice, true);
         } catch (Throwable e) {
-            lock.unlock(getLockPath(sourceColumn));
             throw new RuntimeException(
                     String.format(Locale.ROOT, "Failed to create global 
dictionary on %s ", sourceColumn), e);
+        } finally {
+            if (lock.isLockedByMe(lockPath)) {
+                lock.unlock(lockPath);
+            }
         }
         this.baseId = baseId;
     }
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java
index 335d3c8..3de65ea 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java
@@ -38,7 +38,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, 
getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = 
JsonAutoDetect.Visibility.NONE, setterVisibility = 
JsonAutoDetect.Visibility.NONE)
 public class StreamingConfig extends RootPersistentEntity {
 
-    public static Serializer<StreamingConfig> SERIALIZER = new 
JsonSerializer<StreamingConfig>(StreamingConfig.class);
+    private static final Serializer<StreamingConfig> SERIALIZER = new 
JsonSerializer<>(StreamingConfig.class);
 
     public static final String STREAMING_TYPE_KAFKA = "kafka";
 
diff --git 
a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
 
b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
index c49edc7..574bb9f 100644
--- 
a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
+++ 
b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
@@ -364,12 +364,12 @@ public class DictGridTableTest extends 
LocalFileMetadataTestCase {
         long start = System.currentTimeMillis();
         GTScanRequest req = new 
GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null)
                 .setFilterPushDown(filter).createGTScanRequest();
-        IGTScanner scanner = table.scan(req);
         int i = 0;
-        for (GTRecord r : scanner) {
-            i++;
+        try (IGTScanner scanner = table.scan(req)) {
+            for (GTRecord r : scanner) {
+                i++;
+            }
         }
-        scanner.close();
         long end = System.currentTimeMillis();
         System.out.println(
                 (end - start) + "ms with filter cache enabled=" + 
FilterResultCache.DEFAULT_OPTION + ", " + i + " rows");
@@ -555,17 +555,17 @@ public class DictGridTableTest extends 
LocalFileMetadataTestCase {
 
     private void doScanAndVerify(GridTable table, GTScanRequest req, String... 
verifyRows) throws IOException {
         System.out.println(req);
-        IGTScanner scanner = table.scan(req);
-        int i = 0;
-        for (GTRecord r : scanner) {
-            System.out.println(r);
-            if (verifyRows == null || i >= verifyRows.length) {
-                Assert.fail();
+        try (IGTScanner scanner = table.scan(req)) {
+            int i = 0;
+            for (GTRecord r : scanner) {
+                System.out.println(r);
+                if (verifyRows == null || i >= verifyRows.length) {
+                    Assert.fail();
+                }
+                assertEquals(verifyRows[i], r.toString());
+                i++;
             }
-            assertEquals(verifyRows[i], r.toString());
-            i++;
         }
-        scanner.close();
     }
 
     public static ByteArray enc(GTInfo info, int col, String value) {
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
index 4dd68a5..62fb814 100755
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
@@ -371,17 +371,17 @@ public class MapReduceExecutable extends 
AbstractExecutable {
         long lockStartTime = System.currentTimeMillis();
 
         boolean isLockedByTheJob = lock.isLocked(fullLockPath);
-        logger.info("cube job {} zk lock is isLockedByTheJob:{}", getId(), 
isLockedByTheJob);
-        if (!isLockedByTheJob) {//not lock by the job
+        logger.info("cube job {} zk lock is isLockedByTheJob: {}", getId(), 
isLockedByTheJob);
+        if (!isLockedByTheJob) { //not lock by the job
             while (isLockedByOther) {
-                isLockedByOther = 
lock.isLocked(getCubeJobLockParentPathName());//other job global lock
+                isLockedByOther = 
lock.isLocked(getCubeJobLockParentPathName()); //other job global lock
 
-                if (!isLockedByOther) {//not lock by other job
+                if (!isLockedByOther) { //not lock by other job
                     isLockedByOther = lock.isLocked(ephemeralLockPath);//check 
the ephemeral current lock
                     logger.info("zookeeper lock path :{}, is locked by other 
job result is {}", ephemeralLockPath,
                             isLockedByOther);
 
-                    if (!isLockedByOther) {//the ephemeral lock not lock by 
other job
+                    if (!isLockedByOther) { //the ephemeral lock not lock by 
other job
                         //try to get ephemeral lock
                         try {
                             logger.debug("{} before start to get lock 
ephemeralLockPath {}", getId(),
@@ -413,12 +413,12 @@ public class MapReduceExecutable extends 
AbstractExecutable {
                                 }
                             }
                         }
-                        isLockedByOther = true;//get lock fail,will try again
+                        isLockedByOther = true;//get lock fail, will try again
                     }
                 }
                 // wait 1 min and try again
                 logger.info(
-                        "{}, parent lock path({}) is locked by other job 
result is {} ,ephemeral lock path :{} is locked by other job result is {},will 
try after one minute",
+                        "{}, parent lock path({}) is locked by other job 
result is {} ,ephemeral lock path: {} is locked by other job result is {}, will 
try after one minute",
                         getId(), getCubeJobLockParentPathName(), 
isLockedByOther, ephemeralLockPath, isLockedByOther);
                 Thread.sleep(60000);
             }
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/ColumnarSplitDataReader.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/ColumnarSplitDataReader.java
index d426cfc..9d33119 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/ColumnarSplitDataReader.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/ColumnarSplitDataReader.java
@@ -52,7 +52,7 @@ public class ColumnarSplitDataReader extends 
ColumnarSplitReader {
         init(inputSplit, context);
     }
 
-    public void init(InputSplit split, TaskAttemptContext context) throws 
IOException, InterruptedException {
+    public void init(InputSplit split, TaskAttemptContext context) throws 
IOException {
         baseCuboid = Cuboid.getBaseCuboid(cubeDesc);
         rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, 
baseCuboid);
 
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/DictsReader.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/DictsReader.java
index b543c8b..7aa9d19 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/DictsReader.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/DictsReader.java
@@ -36,43 +36,33 @@ import 
org.apache.kylin.shaded.com.google.common.collect.ImmutableMap;
 
 public class DictsReader extends ColumnarFilesReader {
     private static final Logger logger = 
LoggerFactory.getLogger(DictsReader.class);
-    private FSDataInputStream metaInputStream;
-    private FSDataInputStream dataInputStream;
 
     public DictsReader(Path path, FileSystem fileSystem) throws IOException {
         super(fileSystem, path);
     }
 
     public ImmutableMap<String, Dictionary> readDicts() throws IOException {
-        metaInputStream = fs.open(metaFilePath);
-        FragmentMetaInfo fragmentMetaInfo = 
JsonUtil.readValue(metaInputStream, FragmentMetaInfo.class);
-        List<DimDictionaryMetaInfo> dimDictMetaInfos = 
fragmentMetaInfo.getDimDictionaryMetaInfos();
-        ImmutableMap.Builder<String, Dictionary> builder = 
ImmutableMap.<String, Dictionary> builder();
-        dataInputStream = fs.open(dataFilePath);
-        Dictionary dict;
-        String colName;
-        logger.info("Reading dictionary from {}", dataFilePath.getName());
-        for (DimDictionaryMetaInfo dimDictMetaInfo : dimDictMetaInfos) {
-            dataInputStream.seek(dimDictMetaInfo.getStartOffset());
-            dict = DictionarySerializer.deserialize(dataInputStream);
-            colName = dimDictMetaInfo.getDimName();
-            logger.info("Add dict for {}", colName);
-            builder.put(colName, dict);
+        try (FSDataInputStream metaInputStream = fs.open(metaFilePath);
+             FSDataInputStream dataInputStream = fs.open(dataFilePath)) {
+            FragmentMetaInfo fragmentMetaInfo = 
JsonUtil.readValue(metaInputStream, FragmentMetaInfo.class);
+            List<DimDictionaryMetaInfo> dimDictMetaInfos = 
fragmentMetaInfo.getDimDictionaryMetaInfos();
+            ImmutableMap.Builder<String, Dictionary> builder = 
ImmutableMap.builder();
+            Dictionary dict;
+            String colName;
+            logger.info("Reading dictionary from {}", dataFilePath.getName());
+            for (DimDictionaryMetaInfo dimDictMetaInfo : dimDictMetaInfos) {
+                dataInputStream.seek(dimDictMetaInfo.getStartOffset());
+                dict = DictionarySerializer.deserialize(dataInputStream);
+                colName = dimDictMetaInfo.getDimName();
+                logger.info("Add dict for {}", colName);
+                builder.put(colName, dict);
+            }
+            return builder.build();
         }
-        return builder.build();
     }
 
     @Override
     public void close() throws IOException{
-        try {
-            if (metaInputStream != null) {
-                metaInputStream.close();
-            }
-            if (dataInputStream != null) {
-                dataInputStream.close();
-            }
-        } catch (IOException e) {
-            logger.error("close file error", e);
-        }
+
     }
 }
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/RowRecordReader.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/RowRecordReader.java
index 2206fd7..1e610bd 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/RowRecordReader.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/RowRecordReader.java
@@ -82,24 +82,25 @@ public class RowRecordReader extends ColumnarFilesReader {
     }
 
     public void initReaders() throws IOException {
-        FSDataInputStream in = fs.open(metaFilePath);
-        FragmentMetaInfo fragmentMetaInfo = JsonUtil.readValue(in, 
FragmentMetaInfo.class);
-        CuboidMetaInfo basicCuboidMetaInfo = 
fragmentMetaInfo.getBasicCuboidMetaInfo();
-        FSDataInputStream dictInputStream = fs.open(dataFilePath);
+        List<DimensionMetaInfo> allDimensions;
+        CuboidMetaInfo basicCuboidMetaInfo;
+        Map<String, DimensionEncoding> dimensionEncodingMap;
+        try (FSDataInputStream in = fs.open(metaFilePath)) {
+            FragmentMetaInfo fragmentMetaInfo = JsonUtil.readValue(in, 
FragmentMetaInfo.class);
+            basicCuboidMetaInfo = fragmentMetaInfo.getBasicCuboidMetaInfo();
+            allDimensions = basicCuboidMetaInfo.getDimensionsInfo();
+            dimensionEncodingMap = getDimensionEncodings(fragmentMetaInfo, 
allDimensions, dataFilePath);
+        }
 
-        List<DimensionMetaInfo> allDimensions = 
basicCuboidMetaInfo.getDimensionsInfo();
-        Map<String, DimensionEncoding> dimensionEncodingMap = 
getDimensionEncodings(fragmentMetaInfo, allDimensions,
-                dictInputStream);
         dimensionColumnReaders = Lists.newArrayList();
         dimensionColumnReaderItrs = Lists.newArrayList();
         dimensionEncodings = Lists.newArrayList();
         for (DimensionMetaInfo dimensionMetaInfo : allDimensions) {
-            FSDataInputStream dimInputStream = fs.open(dataFilePath);
             String dimName = dimensionMetaInfo.getName();
             DimensionEncoding dimEncoding = dimensionEncodingMap.get(dimName);
             ColumnarStoreDimDesc dimDesc = new 
ColumnarStoreDimDesc(dimEncoding.getLengthOfEncoding(),
                     dimensionMetaInfo.getCompressionType());
-            ColumnDataReader dimDataReader = 
dimDesc.getDimReaderFromFSInput(dimInputStream,
+            ColumnDataReader dimDataReader = 
dimDesc.getDimReaderFromFSInput(fs, dataFilePath,
                     dimensionMetaInfo.getStartOffset(), 
dimensionMetaInfo.getDataLength(),
                     (int) basicCuboidMetaInfo.getNumberOfRows());
             dimensionColumnReaders.add(dimDataReader);
@@ -112,13 +113,13 @@ public class RowRecordReader extends ColumnarFilesReader {
         metricsColumnReaderItrs = Lists.newArrayList();
         metricsDataTransformers = Lists.newArrayList();
         for (MetricMetaInfo metricMetaInfo : 
basicCuboidMetaInfo.getMetricsInfo()) {
-            FSDataInputStream metricsInputStream = fs.open(dataFilePath);
+
             MeasureDesc measure = findMeasure(metricMetaInfo.getName());
             DataType metricsDataType = 
measure.getFunction().getReturnDataType();
             ColumnarMetricsEncoding metricsEncoding = 
ColumnarMetricsEncodingFactory.create(metricsDataType);
             ColumnarStoreMetricsDesc metricsDesc = new 
ColumnarStoreMetricsDesc(metricsEncoding,
                     metricMetaInfo.getCompressionType());
-            ColumnDataReader metricsDataReader = 
metricsDesc.getMetricsReaderFromFSInput(metricsInputStream,
+            ColumnDataReader metricsDataReader = 
metricsDesc.getMetricsReaderFromFSInput(fs, dataFilePath,
                     metricMetaInfo.getStartOffset(), 
metricMetaInfo.getMetricLength(),
                     (int) basicCuboidMetaInfo.getNumberOfRows());
             metricsColumnReaders.add(metricsDataReader);
@@ -140,8 +141,11 @@ public class RowRecordReader extends ColumnarFilesReader {
     }
 
     private Map<String, DimensionEncoding> 
getDimensionEncodings(FragmentMetaInfo fragmentMetaInfo,
-            List<DimensionMetaInfo> allDimensions, FSDataInputStream 
dictInputStream) throws IOException {
-        Map<String, Dictionary> dictionaryMap = 
readAllDimensionsDictionary(fragmentMetaInfo, dictInputStream);
+            List<DimensionMetaInfo> allDimensions, Path dataPath) throws 
IOException {
+        Map<String, Dictionary> dictionaryMap;
+        try (FSDataInputStream dictInputStream = fs.open(dataPath)) {
+            dictionaryMap = readAllDimensionsDictionary(fragmentMetaInfo, 
dictInputStream);
+        }
 
         Map<String, DimensionEncoding> result = Maps.newHashMap();
         for (DimensionMetaInfo dimension : allDimensions) {
@@ -152,7 +156,7 @@ public class RowRecordReader extends ColumnarFilesReader {
                 Dictionary<String> dict = 
dictionaryMap.get(dimension.getName());
                 if (dict == null) {
                     logger.error("No dictionary found for dict-encoding column 
" + col);
-                    throw new RuntimeException("No dictionary found for 
dict-encoding column " + col);
+                    throw new IllegalStateException("No dictionary found for 
dict-encoding column " + col);
                 } else {
                     result.put(dimension.getName(), new 
DictionaryDimEnc(dict));
                 }
@@ -169,14 +173,11 @@ public class RowRecordReader extends ColumnarFilesReader {
     public Map<String, Dictionary> 
readAllDimensionsDictionary(FragmentMetaInfo fragmentMetaInfo,
             FSDataInputStream dataInputStream) throws IOException {
         List<DimDictionaryMetaInfo> dimDictMetaInfos = 
fragmentMetaInfo.getDimDictionaryMetaInfos();
-        ImmutableMap.Builder<String, Dictionary> builder = 
ImmutableMap.<String, Dictionary> builder();
-        Dictionary dict;
-        String colName;
+        ImmutableMap.Builder<String, Dictionary> builder = 
ImmutableMap.builder();
         for (DimDictionaryMetaInfo dimDictMetaInfo : dimDictMetaInfos) {
             dataInputStream.seek(dimDictMetaInfo.getStartOffset());
-            dict = DictionarySerializer.deserialize(dataInputStream);
-            colName = dimDictMetaInfo.getDimName();
-            builder.put(colName, dict);
+            Dictionary dict = 
DictionarySerializer.deserialize(dataInputStream);
+            builder.put(dimDictMetaInfo.getDimName(), dict);
         }
         return builder.build();
     }
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/realtime/BuildCubeWithStreamV2.java 
b/kylin-it/src/test/java/org/apache/kylin/realtime/BuildCubeWithStreamV2.java
index 781c62a..fb85b26 100644
--- 
a/kylin-it/src/test/java/org/apache/kylin/realtime/BuildCubeWithStreamV2.java
+++ 
b/kylin-it/src/test/java/org/apache/kylin/realtime/BuildCubeWithStreamV2.java
@@ -249,7 +249,7 @@ public class BuildCubeWithStreamV2 extends KylinTestBase {
         }
 
         if (consumeDataDone == false) {
-            throw new IllegalStateException("Exec timeout, data not be 
comsumed completely"); // ensure all messages have been comsumed.
+            throw new IllegalStateException("Exec timeout, data not be 
consumed completely"); // ensure all messages have been comsumed.
         }
 
         waittime = 0;
diff --git 
a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java 
b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
index 423a4f0..2f3ad4c 100755
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
@@ -67,9 +67,9 @@ public class OLAPContext {
     public static final String PRM_ACCEPT_PARTIAL_RESULT = 
"AcceptPartialResult";
     public static final String PRM_USER_AUTHEN_INFO = "UserAuthenInfo";
 
-    static final InternalThreadLocal<Map<String, String>> _localPrarameters = 
new InternalThreadLocal<Map<String, String>>();
+    static final InternalThreadLocal<Map<String, String>> _localPrarameters = 
new InternalThreadLocal<>();
 
-    static final InternalThreadLocal<Map<Integer, OLAPContext>> _localContexts 
= new InternalThreadLocal<Map<Integer, OLAPContext>>();
+    static final InternalThreadLocal<Map<Integer, OLAPContext>> _localContexts 
= new InternalThreadLocal<>();
 
     public static void setParameters(Map<String, String> parameters) {
         _localPrarameters.set(parameters);
@@ -81,7 +81,7 @@ public class OLAPContext {
 
     public static void registerContext(OLAPContext ctx) {
         if (_localContexts.get() == null) {
-            Map<Integer, OLAPContext> contextMap = new HashMap<Integer, 
OLAPContext>();
+            Map<Integer, OLAPContext> contextMap = new HashMap<>();
             _localContexts.set(contextMap);
         }
         _localContexts.get().put(ctx.id, ctx);
@@ -147,7 +147,7 @@ public class OLAPContext {
 
     public Set<TblColRef> allColumns = new HashSet<>();
     public List<TblColRef> groupByColumns = new ArrayList<>();
-    public Set<TblColRef> subqueryJoinParticipants = new 
HashSet<TblColRef>();//subqueryJoinParticipants will be added to 
groupByColumns(only when other group by co-exists) and allColumns
+    public Set<TblColRef> subqueryJoinParticipants = new 
HashSet<>();//subqueryJoinParticipants will be added to groupByColumns(only 
when other group by co-exists) and allColumns
     public Set<TblColRef> metricsColumns = new HashSet<>();
     public List<FunctionDesc> aggregations = new ArrayList<>(); // storage 
level measure type, on top of which various sql aggr function may apply
     public List<TblColRef> aggrOutCols = new ArrayList<>(); // aggregation 
output (inner) columns
@@ -175,7 +175,7 @@ public class OLAPContext {
     public OLAPAuthentication olapAuthen = new OLAPAuthentication();
 
     public boolean isSimpleQuery() {
-        return (joins.size() == 0) && (groupByColumns.size() == 0) && 
(aggregations.size() == 0);
+        return (joins.isEmpty()) && (groupByColumns.isEmpty()) && 
(aggregations.isEmpty());
     }
 
     SQLDigest sqlDigest;
@@ -343,7 +343,7 @@ public class OLAPContext {
     // 
============================================================================
 
     public interface IAccessController {
-        public void check(List<OLAPContext> contexts, KylinConfig config) 
throws IllegalStateException;
+        void check(List<OLAPContext> contexts, KylinConfig config);
     }
 
 }
diff --git 
a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitServiceTest.java
 
b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitServiceTest.java
index 6650e6d..dfae455 100644
--- 
a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitServiceTest.java
+++ 
b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitServiceTest.java
@@ -137,18 +137,19 @@ public class CubeVisitServiceTest extends 
LocalFileMetadataTestCase {
 
         gtInfo = newInfo();
         GridTable gridTable = newTable(gtInfo);
-        IGTScanner scanner = gridTable.scan(new 
GTScanRequestBuilder().setInfo(gtInfo).setRanges(null)
-                
.setDimensions(null).setFilterPushDown(null).createGTScanRequest());
-        for (GTRecord record : scanner) {
-            byte[] value = 
record.exportColumns(gtInfo.getPrimaryKey()).toBytes();
-            byte[] key = new byte[RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN + 
value.length];
-            System.arraycopy(Bytes.toBytes(baseCuboid), 0, key, 
RowConstants.ROWKEY_SHARDID_LEN,
-                    RowConstants.ROWKEY_CUBOIDID_LEN);
-            System.arraycopy(value, 0, key, 
RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN, value.length);
-            Put put = new Put(key);
-            put.addColumn(FAM[0], COL_M, 
record.exportColumns(gtInfo.getColumnBlock(1)).toBytes());
-            put.addColumn(FAM[1], COL_M, 
record.exportColumns(gtInfo.getColumnBlock(2)).toBytes());
-            region.put(put);
+        try (IGTScanner scanner = gridTable.scan(new 
GTScanRequestBuilder().setInfo(gtInfo).setRanges(null)
+                
.setDimensions(null).setFilterPushDown(null).createGTScanRequest())) {
+            for (GTRecord record : scanner) {
+                byte[] value = 
record.exportColumns(gtInfo.getPrimaryKey()).toBytes();
+                byte[] key = new byte[RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN 
+ value.length];
+                System.arraycopy(Bytes.toBytes(baseCuboid), 0, key, 
RowConstants.ROWKEY_SHARDID_LEN,
+                        RowConstants.ROWKEY_CUBOIDID_LEN);
+                System.arraycopy(value, 0, key, 
RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN, value.length);
+                Put put = new Put(key);
+                put.addColumn(FAM[0], COL_M, 
record.exportColumns(gtInfo.getColumnBlock(1)).toBytes());
+                put.addColumn(FAM[1], COL_M, 
record.exportColumns(gtInfo.getColumnBlock(2)).toBytes());
+                region.put(put);
+            }
         }
     }
 
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarMemoryStorePersister.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarMemoryStorePersister.java
index 8114360..02a6ad6 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarMemoryStorePersister.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarMemoryStorePersister.java
@@ -29,7 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentMap;
 
 import javax.annotation.Nullable;
 
@@ -125,7 +125,7 @@ public class ColumnarMemoryStorePersister {
         FileOutputStream fragmentOutputStream = 
FileUtils.openOutputStream(fragment.getDataFile());
 
         try (CountingOutputStream fragmentOut = new CountingOutputStream(new 
BufferedOutputStream(fragmentOutputStream))) {
-            ConcurrentSkipListMap<String[], MeasureAggregator[]> 
basicCuboidData = memoryStore.getBasicCuboidData();
+            ConcurrentMap<String[], MeasureAggregator[]> basicCuboidData = 
memoryStore.getBasicCuboidData();
             List<List<Object>> basicCuboidColumnarValues = 
transformToColumnar(baseCuboidId, dimensions.length,
                     basicCuboidData);
             // persist dictionaries
@@ -139,13 +139,13 @@ public class ColumnarMemoryStorePersister {
             long totalRowCnt = basicCuboidMeta.getNumberOfRows();
 
             // persist additional cuboids
-            Map<CuboidInfo, ConcurrentSkipListMap<String[], 
MeasureAggregator[]>> additionalCuboidsData = memoryStore
+            Map<CuboidInfo, ConcurrentMap<String[], MeasureAggregator[]>> 
additionalCuboidsData = memoryStore
                     .getAdditionalCuboidsData();
             if (additionalCuboidsData != null && additionalCuboidsData.size() 
> 0) {
-                for (Entry<CuboidInfo, ConcurrentSkipListMap<String[], 
MeasureAggregator[]>> cuboidDataEntry : additionalCuboidsData
+                for (Entry<CuboidInfo, ConcurrentMap<String[], 
MeasureAggregator[]>> cuboidDataEntry : additionalCuboidsData
                         .entrySet()) {
                     CuboidInfo cuboidInfo = cuboidDataEntry.getKey();
-                    ConcurrentSkipListMap<String[], MeasureAggregator[]> 
cuboidData = cuboidDataEntry.getValue();
+                    ConcurrentMap<String[], MeasureAggregator[]> cuboidData = 
cuboidDataEntry.getValue();
                     List<List<Object>> cuboidColumnarValues = 
transformToColumnar(cuboidInfo.getCuboidID(),
                             cuboidInfo.getDimCount(), cuboidData);
                     CuboidMetaInfo cuboidMeta = 
persistCuboidData(cuboidInfo.getCuboidID(), cuboidInfo.getDimensions(),
@@ -171,7 +171,7 @@ public class ColumnarMemoryStorePersister {
      * @return
      */
     private List<List<Object>> transformToColumnar(long cuboidId, int dimCnt,
-            ConcurrentSkipListMap<String[], MeasureAggregator[]> aggBufMap) {
+            ConcurrentMap<String[], MeasureAggregator[]> aggBufMap) {
         Stopwatch stopwatch = Stopwatch.createUnstarted();
         stopwatch.start();
         int columnsNum = dimCnt + measures.length;
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreDimDesc.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreDimDesc.java
index 971d137..b762c7e 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreDimDesc.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreDimDesc.java
@@ -22,7 +22,8 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
-import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.dimension.DimensionEncoding;
 import org.apache.kylin.stream.core.storage.columnar.compress.Compression;
@@ -81,14 +82,14 @@ public class ColumnarStoreDimDesc {
         return new NoCompressedColumnReader(dataBuffer, columnDataStartOffset, 
columnDataLength / rowCount, rowCount);
     }
 
-    public ColumnDataReader getDimReaderFromFSInput(FSDataInputStream 
inputStream, int columnDataStartOffset,
-            int columnDataLength, int rowCount) throws IOException {
+    public ColumnDataReader getDimReaderFromFSInput(FileSystem fs, Path file, 
int columnDataStartOffset,
+                                                    int columnDataLength, int 
rowCount) throws IOException {
         if (compression == Compression.LZ4) {
-            return new FSInputLZ4CompressedColumnReader(inputStream, 
columnDataStartOffset, columnDataLength, rowCount);
+            return new FSInputLZ4CompressedColumnReader(fs, file, 
columnDataStartOffset, columnDataLength, rowCount);
         } else if (compression == Compression.RUN_LENGTH) {
-            return new FSInputRLECompressedColumnReader(inputStream, 
columnDataStartOffset, columnDataLength, rowCount);
+            return new FSInputRLECompressedColumnReader(fs, file, 
columnDataStartOffset, columnDataLength, rowCount);
         }
-        return new FSInputNoCompressedColumnReader(inputStream, 
columnDataStartOffset, columnDataLength / rowCount,
+        return new FSInputNoCompressedColumnReader(fs, file, 
columnDataStartOffset, columnDataLength / rowCount,
                 rowCount);
     }
 }
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreMetricsDesc.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreMetricsDesc.java
index eca5ae7..a3f8de9 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreMetricsDesc.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreMetricsDesc.java
@@ -23,7 +23,8 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
-import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.kylin.stream.core.storage.columnar.compress.Compression;
 import 
org.apache.kylin.stream.core.storage.columnar.compress.FSInputLZ4CompressedColumnReader;
 import 
org.apache.kylin.stream.core.storage.columnar.compress.FSInputNoCompressedColumnReader;
@@ -74,16 +75,16 @@ public class ColumnarStoreMetricsDesc {
         return new GeneralColumnDataReader(dataBuffer, columnDataStartOffset, 
columnDataLength);
     }
 
-    public ColumnDataReader getMetricsReaderFromFSInput(FSDataInputStream 
inputStream, int columnDataStartOffset,
-            int columnDataLength, int rowCount) throws IOException {
+    public ColumnDataReader getMetricsReaderFromFSInput(FileSystem fs, Path 
filePath, int columnDataStartOffset,
+                                                        int columnDataLength, 
int rowCount) throws IOException {
         if (Compression.LZ4 == compression && fixLen != -1) {
-            return new FSInputLZ4CompressedColumnReader(inputStream, 
columnDataStartOffset, columnDataLength, rowCount);
+            return new FSInputLZ4CompressedColumnReader(fs, filePath, 
columnDataStartOffset, columnDataLength, rowCount);
         }
         if (fixLen != -1) {
-            return new FSInputNoCompressedColumnReader(inputStream, 
columnDataStartOffset, columnDataLength / rowCount,
+            return new FSInputNoCompressedColumnReader(fs, filePath, 
columnDataStartOffset, columnDataLength / rowCount,
                     rowCount);
         }
-        return new FSInputGeneralColumnDataReader(inputStream, 
columnDataStartOffset, columnDataLength);
+        return new FSInputGeneralColumnDataReader(fs, filePath, 
columnDataStartOffset, columnDataLength);
     }
 
 }
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FSInputGeneralColumnDataReader.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FSInputGeneralColumnDataReader.java
index 5282533..363886b 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FSInputGeneralColumnDataReader.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FSInputGeneralColumnDataReader.java
@@ -23,14 +23,16 @@ import java.util.Iterator;
 import java.util.NoSuchElementException;
 
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 
 public class FSInputGeneralColumnDataReader implements ColumnDataReader {
     private FSDataInputStream fsInputStream;
     private int numOfVals;
 
-    public FSInputGeneralColumnDataReader(FSDataInputStream fsInputStream, int 
dataStartOffset, int dataLength)
+    public FSInputGeneralColumnDataReader(FileSystem fs, Path file, int 
dataStartOffset, int dataLength)
             throws IOException {
-        this.fsInputStream = fsInputStream;
+        this.fsInputStream = fs.open(file);
         fsInputStream.seek(dataStartOffset + dataLength - 4L);
         this.numOfVals = fsInputStream.readInt();
         fsInputStream.seek(dataStartOffset);
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentCuboidReader.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentCuboidReader.java
index 2b5621b..432cbbf 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentCuboidReader.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentCuboidReader.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.dimension.DimensionEncoding;
@@ -39,14 +40,11 @@ public class FragmentCuboidReader implements 
Iterable<RawRecord> {
     private long readRowCount = 0;
     private int dimCnt;
     private int metricCnt;
-
-    private CubeDesc cubeDesc;
     private ColumnDataReader[] dimensionDataReaders;
     private ColumnDataReader[] metricDataReaders;
 
     public FragmentCuboidReader(CubeDesc cubeDesc, FragmentData fragmentData, 
CuboidMetaInfo cuboidMetaInfo,
             TblColRef[] dimensions, MeasureDesc[] measures, 
DimensionEncoding[] dimEncodings) {
-        this.cubeDesc = cubeDesc;
         this.dimCnt = dimensions.length;
         this.metricCnt = measures.length;
         this.dimensionDataReaders = new ColumnDataReader[dimCnt];
@@ -127,14 +125,14 @@ public class FragmentCuboidReader implements 
Iterable<RawRecord> {
         return new Iterator<RawRecord>() {
             @Override
             public boolean hasNext() {
-                if (readRowCount >= rowCount) {
-                    return false;
-                }
-                return true;
+                return readRowCount < rowCount;
             }
 
             @Override
             public RawRecord next() {
+                if (!hasNext())
+                    throw new NoSuchElementException();
+
                 for (int i = 0; i < dimensionDataReaders.length; i++) {
                     oneRawRecord.setDimension(i, dimValItr[i].next());
                 }
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/SegmentMemoryStore.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/SegmentMemoryStore.java
index 2bf9127..24c107d 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/SegmentMemoryStore.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/SegmentMemoryStore.java
@@ -25,8 +25,10 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -64,8 +66,8 @@ public class SegmentMemoryStore implements 
IStreamingGTSearcher {
     protected final ParsedStreamingCubeInfo parsedStreamingCubeInfo;
     protected final String segmentName;
 
-    private volatile Map<CuboidInfo, ConcurrentSkipListMap<String[], 
MeasureAggregator[]>> cuboidsAggBufMap;
-    private volatile ConcurrentSkipListMap<String[], MeasureAggregator[]> 
basicCuboidAggBufMap;
+    private volatile Map<CuboidInfo, ConcurrentMap<String[], 
MeasureAggregator[]>> cuboidsAggBufMap;
+    private volatile ConcurrentMap<String[], MeasureAggregator[]> 
basicCuboidAggBufMap;
 
     private volatile AtomicInteger rowCount = new AtomicInteger();
     private volatile AtomicInteger originRowCount = new AtomicInteger();
@@ -84,10 +86,10 @@ public class SegmentMemoryStore implements 
IStreamingGTSearcher {
 
         this.basicCuboidAggBufMap = new 
ConcurrentSkipListMap<>(StringArrayComparator.INSTANCE);
         List<CuboidInfo> additionalCuboids = 
parsedStreamingCubeInfo.getAdditionalCuboidsToBuild();
-        if (additionalCuboids != null && additionalCuboids.size() > 0) {
+        if (additionalCuboids != null && !additionalCuboids.isEmpty()) {
             this.cuboidsAggBufMap = new 
ConcurrentHashMap<>(additionalCuboids.size());
             for (CuboidInfo cuboidInfo : additionalCuboids) {
-                cuboidsAggBufMap.put(cuboidInfo, new 
ConcurrentSkipListMap<String[], MeasureAggregator[]>(
+                cuboidsAggBufMap.put(cuboidInfo, new ConcurrentSkipListMap<>(
                         StringArrayComparator.INSTANCE));
             }
         }
@@ -107,10 +109,10 @@ public class SegmentMemoryStore implements 
IStreamingGTSearcher {
         Object[] metricsValues = buildValue(row);
         aggregate(basicCuboidAggBufMap, basicCuboidDimensions, metricsValues);
         if (cuboidsAggBufMap != null) {
-            for (Entry<CuboidInfo, ConcurrentSkipListMap<String[], 
MeasureAggregator[]>> cuboidAggEntry : cuboidsAggBufMap
+            for (Entry<CuboidInfo, ConcurrentMap<String[], 
MeasureAggregator[]>> cuboidAggEntry : cuboidsAggBufMap
                     .entrySet()) {
                 CuboidInfo cuboidInfo = cuboidAggEntry.getKey();
-                ConcurrentSkipListMap<String[], MeasureAggregator[]> 
cuboidAggMap = cuboidAggEntry.getValue();
+                ConcurrentMap<String[], MeasureAggregator[]> cuboidAggMap = 
cuboidAggEntry.getValue();
                 String[] cuboidDimensions = buildCuboidKey(cuboidInfo, row);
                 aggregate(cuboidAggMap, cuboidDimensions, metricsValues);
             }
@@ -172,7 +174,7 @@ public class SegmentMemoryStore implements 
IStreamingGTSearcher {
     }
 
     @SuppressWarnings("unchecked")
-    private void aggregate(ConcurrentSkipListMap<String[], 
MeasureAggregator[]> cuboidAggBufMap, String[] dimensions,
+    private void aggregate(ConcurrentMap<String[], MeasureAggregator[]> 
cuboidAggBufMap, String[] dimensions,
             Object[] metricsValues) {
         MeasureAggregator[] aggrs = cuboidAggBufMap.get(dimensions);
         if (aggrs != null) {
@@ -217,20 +219,20 @@ public class SegmentMemoryStore implements 
IStreamingGTSearcher {
         return originRowCount.get();
     }
 
-    public ConcurrentSkipListMap<String[], MeasureAggregator[]> 
getBasicCuboidData() {
+    public ConcurrentMap<String[], MeasureAggregator[]> getBasicCuboidData() {
         return basicCuboidAggBufMap;
     }
 
-    public Map<CuboidInfo, ConcurrentSkipListMap<String[], 
MeasureAggregator[]>> getAdditionalCuboidsData() {
+    public Map<CuboidInfo, ConcurrentMap<String[], MeasureAggregator[]>> 
getAdditionalCuboidsData() {
         return cuboidsAggBufMap;
     }
 
-    public ConcurrentSkipListMap<String[], MeasureAggregator[]> 
getCuboidData(long cuboidID) {
+    public ConcurrentMap<String[], MeasureAggregator[]> getCuboidData(long 
cuboidID) {
         if (cuboidID == parsedStreamingCubeInfo.basicCuboid.getId()) {
             return basicCuboidAggBufMap;
         } else {
             CuboidInfo cuboidInfo = new CuboidInfo(cuboidID);
-            ConcurrentSkipListMap<String[], MeasureAggregator[]> result = 
cuboidsAggBufMap.get(cuboidInfo);
+            ConcurrentMap<String[], MeasureAggregator[]> result = 
cuboidsAggBufMap.get(cuboidInfo);
             if (result != null) {
                 return result;
             }
@@ -357,6 +359,9 @@ public class SegmentMemoryStore implements 
IStreamingGTSearcher {
 
                 @Override
                 public Record next() {
+                    if (nextEntry == null)
+                        throw new NoSuchElementException();
+                    
                     try {
                         String[] allDimensions = nextEntry.getKey();
                         MeasureAggregator<?>[] allMetrics = 
nextEntry.getValue();
@@ -389,8 +394,6 @@ public class SegmentMemoryStore implements 
IStreamingGTSearcher {
                         System.arraycopy(aggrResult, 0, 
oneRecord.getMetrics(), 0, aggrResult.length);
                         count++;
                         return oneRecord;
-                    } catch (RuntimeException exception) {
-                        throw exception;
                     } finally {
                         nextEntry = null;
                     }
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/FSInputLZ4CompressedColumnReader.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/FSInputLZ4CompressedColumnReader.java
index 0de561e..a00134f 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/FSInputLZ4CompressedColumnReader.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/FSInputLZ4CompressedColumnReader.java
@@ -24,6 +24,8 @@ import java.util.Iterator;
 import java.util.NoSuchElementException;
 
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.kylin.stream.core.storage.columnar.ColumnDataReader;
 
 import net.jpountz.lz4.LZ4Factory;
@@ -40,10 +42,10 @@ public class FSInputLZ4CompressedColumnReader implements 
ColumnDataReader {
     private LZ4SafeDecompressor deCompressor;
     private FSDataInputStream fsInputStream;
 
-    public FSInputLZ4CompressedColumnReader(FSDataInputStream fsInputStream, 
int columnDataStartOffset,
-            int columnDataLength, int rowCount) throws IOException {
+    public FSInputLZ4CompressedColumnReader(FileSystem fs, Path file, int 
columnDataStartOffset,
+                                            int columnDataLength, int 
rowCount) throws IOException {
         this.rowCount = rowCount;
-        this.fsInputStream = fsInputStream;
+        this.fsInputStream = fs.open(file);
         int footStartOffset = columnDataStartOffset + columnDataLength - 8;
         fsInputStream.seek(footStartOffset);
         this.numValInBlock = fsInputStream.readInt();
@@ -86,11 +88,14 @@ public class FSInputLZ4CompressedColumnReader implements 
ColumnDataReader {
 
         @Override
         public byte[] next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
             if (currBlockNum == -1 || !decompressedBuffer.hasRemaining()) {
                 try {
                     loadNextBuffer();
                 } catch (IOException e) {
-                    throw new NoSuchElementException("error when read data");
+                    throw new NoSuchElementException("error when read data " + 
e.getMessage());
                 }
             }
             byte[] readBuffer = new byte[valLen];
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/FSInputNoCompressedColumnReader.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/FSInputNoCompressedColumnReader.java
index c6255cd..4f1ce8d 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/FSInputNoCompressedColumnReader.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/FSInputNoCompressedColumnReader.java
@@ -20,23 +20,22 @@ package 
org.apache.kylin.stream.core.storage.columnar.compress;
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.NoSuchElementException;
 
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.kylin.stream.core.storage.columnar.ColumnDataReader;
 
 public class FSInputNoCompressedColumnReader implements ColumnDataReader {
     private FSDataInputStream fsInputStream;
     private byte[] readBuffer;
-    private int colDataStartOffset;
-    private int colValLength;
     private int rowCount;
 
-    public FSInputNoCompressedColumnReader(FSDataInputStream fsInputStream, 
int colDataStartOffset, int colValLength,
-            int rowCount) throws IOException {
-        this.fsInputStream = fsInputStream;
-        this.colDataStartOffset = colDataStartOffset;
+    public FSInputNoCompressedColumnReader(FileSystem fs, Path file, int 
colDataStartOffset, int colValLength,
+                                           int rowCount) throws IOException {
+        this.fsInputStream = fs.open(file);
         fsInputStream.seek(colDataStartOffset);
-        this.colValLength = colValLength;
         this.rowCount = rowCount;
         this.readBuffer = new byte[colValLength];
     }
@@ -65,10 +64,13 @@ public class FSInputNoCompressedColumnReader implements 
ColumnDataReader {
 
         @Override
         public byte[] next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
             try {
                 fsInputStream.readFully(readBuffer);
             } catch (IOException e) {
-                throw new RuntimeException("error when read data", e);
+                throw new NoSuchElementException("error when read data " + 
e.getMessage());
             }
             readRowCount++;
             return readBuffer;
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/FSInputRLECompressedColumnReader.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/FSInputRLECompressedColumnReader.java
index fb17ceb..a5c7e16 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/FSInputRLECompressedColumnReader.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/FSInputRLECompressedColumnReader.java
@@ -21,8 +21,11 @@ package 
org.apache.kylin.stream.core.storage.columnar.compress;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Iterator;
+import java.util.NoSuchElementException;
 
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.kylin.stream.core.storage.columnar.ColumnDataReader;
 
 public class FSInputRLECompressedColumnReader implements ColumnDataReader {
@@ -34,10 +37,10 @@ public class FSInputRLECompressedColumnReader implements 
ColumnDataReader {
 
     private int rowCount;
 
-    public FSInputRLECompressedColumnReader(FSDataInputStream fsInputStream, 
int columnDataStartOffset,
-            int columnDataLength, int rowCount) throws IOException {
+    public FSInputRLECompressedColumnReader(FileSystem fs, Path file, int 
columnDataStartOffset,
+                                            int columnDataLength, int 
rowCount) throws IOException {
         this.rowCount = rowCount;
-        this.fsInputStream = fsInputStream;
+        this.fsInputStream = fs.open(file);
         int footStartOffset = columnDataStartOffset + columnDataLength - 8;
         fsInputStream.seek(footStartOffset);
         this.numValInBlock = fsInputStream.readInt();
@@ -84,6 +87,9 @@ public class FSInputRLECompressedColumnReader implements 
ColumnDataReader {
 
         @Override
         public byte[] next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
             if (readRLEntryValCnt >= currRLEntryValCnt) {
                 loadNextEntry();
             }
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/LZ4CompressedColumnReader.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/LZ4CompressedColumnReader.java
index b923220..bb09240 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/LZ4CompressedColumnReader.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/LZ4CompressedColumnReader.java
@@ -26,6 +26,7 @@ import 
org.apache.kylin.stream.core.storage.columnar.GeneralColumnDataReader;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Iterator;
+import java.util.NoSuchElementException;
 
 public class LZ4CompressedColumnReader implements ColumnDataReader {
     private int rowCount;
@@ -36,16 +37,13 @@ public class LZ4CompressedColumnReader implements 
ColumnDataReader {
 
     private int currBlockNum;
     private LZ4SafeDecompressor deCompressor;
-    private ByteBuffer dataBuffer;
     private ByteBuffer decompressedBuffer;
 
-//    private byte[] readBuffer;
     private GeneralColumnDataReader blockDataReader;
 
     public LZ4CompressedColumnReader(ByteBuffer dataBuffer, int 
columnDataStartOffset, int columnDataLength,
                                  int rowCount) {
         this.rowCount = rowCount;
-        this.dataBuffer = dataBuffer;
         int footStartOffset = columnDataStartOffset + columnDataLength - 8;
         dataBuffer.position(footStartOffset);
         this.numValInBlock = dataBuffer.getInt();
@@ -53,7 +51,6 @@ public class LZ4CompressedColumnReader implements 
ColumnDataReader {
 
         this.blockDataReader = new GeneralColumnDataReader(dataBuffer, 
columnDataStartOffset, columnDataLength - 8);
         this.currBlockNum = -1;
-//        this.readBuffer = new byte[valLen];
 
         this.deCompressor = LZ4Factory.fastestInstance().safeDecompressor();
         this.maxBufferLength = numValInBlock * valLen;
@@ -104,6 +101,9 @@ public class LZ4CompressedColumnReader implements 
ColumnDataReader {
 
         @Override
         public byte[] next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
             if (currBlockNum == -1 || !decompressedBuffer.hasRemaining()) {
                 loadNextBuffer();
             }
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/NoCompressedColumnReader.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/NoCompressedColumnReader.java
index 77abd76..9378f3e 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/NoCompressedColumnReader.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/NoCompressedColumnReader.java
@@ -21,6 +21,7 @@ package 
org.apache.kylin.stream.core.storage.columnar.compress;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Iterator;
+import java.util.NoSuchElementException;
 
 import org.apache.kylin.stream.core.storage.columnar.ColumnDataReader;
 
@@ -67,6 +68,9 @@ public class NoCompressedColumnReader implements 
ColumnDataReader {
 
         @Override
         public byte[] next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
             byte[] readBuffer = new byte[colValLength];
             dataBuffer.get(readBuffer);
             readRowCount++;
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/RunLengthCompressedColumnReader.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/RunLengthCompressedColumnReader.java
index 5346a72..131da93 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/RunLengthCompressedColumnReader.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/RunLengthCompressedColumnReader.java
@@ -21,6 +21,7 @@ package 
org.apache.kylin.stream.core.storage.columnar.compress;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Iterator;
+import java.util.NoSuchElementException;
 
 import org.apache.kylin.stream.core.storage.columnar.ColumnDataReader;
 import org.apache.kylin.stream.core.storage.columnar.GeneralColumnDataReader;
@@ -123,6 +124,10 @@ public class RunLengthCompressedColumnReader implements 
ColumnDataReader {
 
         @Override
         public byte[] next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+
             if (readRLEntryValCnt >= currRLEntryValCnt) {
                 loadNextEntry();
             }
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/util/RecordsSerializer.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/util/RecordsSerializer.java
index 640862b..4d5d819 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/util/RecordsSerializer.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/util/RecordsSerializer.java
@@ -113,6 +113,7 @@ public class RecordsSerializer {
 
             @Override
             public Record next() {
+
                 deserializeRecord(oneRecord, inputBuffer);
                 return oneRecord;
             }
diff --git 
a/stream-core/src/test/java/org/apache/kylin/stream/core/storage/columnar/compress/LZ4CompressColumnTest.java
 
b/stream-core/src/test/java/org/apache/kylin/stream/core/storage/columnar/compress/LZ4CompressColumnTest.java
index 04a9193..40b6443 100644
--- 
a/stream-core/src/test/java/org/apache/kylin/stream/core/storage/columnar/compress/LZ4CompressColumnTest.java
+++ 
b/stream-core/src/test/java/org/apache/kylin/stream/core/storage/columnar/compress/LZ4CompressColumnTest.java
@@ -28,7 +28,6 @@ import java.nio.channels.FileChannel.MapMode;
 import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.util.Bytes;
@@ -77,8 +76,7 @@ public class LZ4CompressColumnTest {
         int compressedSize = writeCompressedData(rowCnt);
         System.out.println("compressed data size:" + compressedSize);
         FileSystem fs = FileSystem.getLocal(new Configuration());
-        FSDataInputStream fsInputStream = fs.open(new 
Path(tmpColFile.getAbsolutePath()));
-        try (FSInputLZ4CompressedColumnReader reader = new 
FSInputLZ4CompressedColumnReader(fsInputStream, 0,
+        try (FSInputLZ4CompressedColumnReader reader = new 
FSInputLZ4CompressedColumnReader(fs, new Path(tmpColFile.getAbsolutePath()), 0,
                 compressedSize, rowCnt)) {
             int k = 0;
             for (byte[] val : reader) {
diff --git 
a/stream-core/src/test/java/org/apache/kylin/stream/core/storage/columnar/compress/NoCompressColumnTest.java
 
b/stream-core/src/test/java/org/apache/kylin/stream/core/storage/columnar/compress/NoCompressColumnTest.java
index 0b1b92c..43d97ea 100644
--- 
a/stream-core/src/test/java/org/apache/kylin/stream/core/storage/columnar/compress/NoCompressColumnTest.java
+++ 
b/stream-core/src/test/java/org/apache/kylin/stream/core/storage/columnar/compress/NoCompressColumnTest.java
@@ -28,7 +28,6 @@ import java.nio.channels.FileChannel.MapMode;
 import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.util.Bytes;
@@ -74,8 +73,8 @@ public class NoCompressColumnTest {
         writeNoCompressedData(rowCnt);
 
         FileSystem fs = FileSystem.getLocal(new Configuration());
-        FSDataInputStream fsInputStream = fs.open(new 
Path(tmpColFile.getAbsolutePath()));
-        try (FSInputNoCompressedColumnReader reader = new 
FSInputNoCompressedColumnReader(fsInputStream, 0, 4, rowCnt)) {
+        try (FSInputNoCompressedColumnReader reader = new 
FSInputNoCompressedColumnReader(fs,
+                new Path(tmpColFile.getAbsolutePath()), 0, 4, rowCnt)) {
             int k = 0;
             for (byte[] val : reader) {
                 assertEquals(k, Bytes.toInt(val));
diff --git 
a/stream-core/src/test/java/org/apache/kylin/stream/core/storage/columnar/compress/RunLengthCompressColumnTest.java
 
b/stream-core/src/test/java/org/apache/kylin/stream/core/storage/columnar/compress/RunLengthCompressColumnTest.java
index 3015aa7..bc31db6 100644
--- 
a/stream-core/src/test/java/org/apache/kylin/stream/core/storage/columnar/compress/RunLengthCompressColumnTest.java
+++ 
b/stream-core/src/test/java/org/apache/kylin/stream/core/storage/columnar/compress/RunLengthCompressColumnTest.java
@@ -28,7 +28,6 @@ import java.nio.channels.FileChannel.MapMode;
 import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.util.Bytes;
@@ -89,8 +88,7 @@ public class RunLengthCompressColumnTest {
 
         int size = writeCompressData1(rowCnt);
         FileSystem fs = FileSystem.getLocal(new Configuration());
-        FSDataInputStream fsInputStream = fs.open(new 
Path(tmpColFile.getAbsolutePath()));
-        try (FSInputRLECompressedColumnReader reader = new 
FSInputRLECompressedColumnReader(fsInputStream, 0, size,
+        try (FSInputRLECompressedColumnReader reader = new 
FSInputRLECompressedColumnReader(fs, new Path(tmpColFile.getAbsolutePath()), 0, 
size,
                 rowCnt)) {
             int k = 0;
             for (byte[] val : reader) {
diff --git 
a/tool/src/main/java/org/apache/kylin/tool/query/ProbabilityGenerator.java 
b/tool/src/main/java/org/apache/kylin/tool/query/ProbabilityGenerator.java
index 9f32de1..800d4c9 100644
--- a/tool/src/main/java/org/apache/kylin/tool/query/ProbabilityGenerator.java
+++ b/tool/src/main/java/org/apache/kylin/tool/query/ProbabilityGenerator.java
@@ -70,6 +70,10 @@ public class ProbabilityGenerator {
         double[] pQueryArray = new double[nOfEle];
 
         int sumHit = generateHitNumberList(nHitArray);
+
+        if (sumHit == 0)
+            throw new IllegalStateException();
+
         for (int i = 0; i < nOfEle; i++) {
             pQueryArray[i] = nHitArray[i] * 1.0 / sumHit;
         }

Reply via email to