This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 0b936eb minor, fix sonar reported bugs (#1368)
0b936eb is described below
commit 0b936eb16c7fb29d337f0e59759231030256ac28
Author: Shaofeng Shi <[email protected]>
AuthorDate: Sun Aug 23 16:32:45 2020 +0800
minor, fix sonar reported bugs (#1368)
* minor, fix sonar reported bugs
* minor, fix sonar
* minor, code format
---
.../java/org/apache/kylin/common/util/Bytes.java | 2 +-
.../cube/inmemcubing/AbstractInMemCubeBuilder.java | 8 +--
.../cube/inmemcubing/ICuboidGTTableWriter.java | 8 +--
.../kylin/cube/inmemcubing/InMemCubeBuilder.java | 11 +--
.../gridtable/GTAggregateTransformScanner.java | 37 ++++++++--
.../gridtable/GTTwoLayerAggregateScanner.java | 2 +
.../kylin/cube/inmemcubing/MemDiskStoreTest.java | 11 +--
.../kylin/gridtable/SimpleGridTableTest.java | 84 +++++++++++-----------
.../apache/kylin/dict/GlobalDictionaryBuilder.java | 11 +--
.../kylin/metadata/streaming/StreamingConfig.java | 2 +-
.../kylin/storage/gtrecord/DictGridTableTest.java | 26 +++----
.../engine/mr/common/MapReduceExecutable.java | 14 ++--
.../mr/streaming/ColumnarSplitDataReader.java | 2 +-
.../kylin/engine/mr/streaming/DictsReader.java | 44 +++++-------
.../kylin/engine/mr/streaming/RowRecordReader.java | 41 +++++------
.../kylin/realtime/BuildCubeWithStreamV2.java | 2 +-
.../apache/kylin/query/relnode/OLAPContext.java | 12 ++--
.../coprocessor/endpoint/CubeVisitServiceTest.java | 25 +++----
.../columnar/ColumnarMemoryStorePersister.java | 12 ++--
.../storage/columnar/ColumnarStoreDimDesc.java | 13 ++--
.../storage/columnar/ColumnarStoreMetricsDesc.java | 13 ++--
.../columnar/FSInputGeneralColumnDataReader.java | 6 +-
.../storage/columnar/FragmentCuboidReader.java | 12 ++--
.../core/storage/columnar/SegmentMemoryStore.java | 29 ++++----
.../compress/FSInputLZ4CompressedColumnReader.java | 13 ++--
.../compress/FSInputNoCompressedColumnReader.java | 18 ++---
.../compress/FSInputRLECompressedColumnReader.java | 12 +++-
.../compress/LZ4CompressedColumnReader.java | 8 +--
.../compress/NoCompressedColumnReader.java | 4 ++
.../compress/RunLengthCompressedColumnReader.java | 5 ++
.../kylin/stream/core/util/RecordsSerializer.java | 1 +
.../columnar/compress/LZ4CompressColumnTest.java | 4 +-
.../columnar/compress/NoCompressColumnTest.java | 5 +-
.../compress/RunLengthCompressColumnTest.java | 4 +-
.../kylin/tool/query/ProbabilityGenerator.java | 4 ++
35 files changed, 273 insertions(+), 232 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/Bytes.java
b/core-common/src/main/java/org/apache/kylin/common/util/Bytes.java
index 65f6fef..2f13fba 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/Bytes.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/Bytes.java
@@ -71,7 +71,7 @@ public class Bytes {
/**
* Size of boolean in bytes
*/
- public static final int SIZEOF_BOOLEAN = Byte.SIZE / Byte.SIZE;
+ public static final int SIZEOF_BOOLEAN = 1;
/**
* Size of byte in bytes
diff --git
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
index df1fa7a..89c2cd8 100644
---
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
+++
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
@@ -101,11 +101,11 @@ abstract public class AbstractInMemCubeBuilder {
protected void outputCuboid(long cuboidId, GridTable gridTable,
ICuboidWriter output) throws IOException {
long startTime = System.currentTimeMillis();
GTScanRequest req = new
GTScanRequestBuilder().setInfo(gridTable.getInfo()).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest();
- IGTScanner scanner = gridTable.scan(req);
- for (GTRecord record : scanner) {
- output.write(cuboidId, record);
+ try (IGTScanner scanner = gridTable.scan(req)) {
+ for (GTRecord record : scanner) {
+ output.write(cuboidId, record);
+ }
}
- scanner.close();
logger.debug("Cuboid " + cuboidId + " output takes " +
(System.currentTimeMillis() - startTime) + "ms");
}
diff --git
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidGTTableWriter.java
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidGTTableWriter.java
index 93a7994..5c32d6c 100755
---
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidGTTableWriter.java
+++
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidGTTableWriter.java
@@ -37,11 +37,11 @@ public abstract class ICuboidGTTableWriter implements
ICuboidWriter{
public void write(long cuboidId, GridTable gridTable) throws IOException {
long startTime = System.currentTimeMillis();
GTScanRequest req = new
GTScanRequestBuilder().setInfo(gridTable.getInfo()).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest();
- IGTScanner scanner = gridTable.scan(req);
- for (GTRecord record : scanner) {
- write(cuboidId, record);
+ try (IGTScanner scanner = gridTable.scan(req)) {
+ for (GTRecord record : scanner) {
+ write(cuboidId, record);
+ }
}
- scanner.close();
logger.info("Cuboid " + cuboidId + " output takes " +
(System.currentTimeMillis() - startTime) + "ms");
}
}
diff --git
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index 403ec27..eabad14 100644
---
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -445,15 +445,14 @@ public class InMemCubeBuilder extends
AbstractInMemCubeBuilder {
long startTime = System.currentTimeMillis();
logger.info("Calculating cuboid {}", cuboidId);
- GTAggregateScanner scanner = prepareGTAggregationScanner(gridTable,
parentId, cuboidId, aggregationColumns, measureColumns);
- GridTable newGridTable = newGridTableByCuboidID(cuboidId);
- GTBuilder builder = newGridTable.rebuild();
+ GridTable newGridTable = newGridTableByCuboidID(cuboidId);
ImmutableBitSet allNeededColumns =
aggregationColumns.or(measureColumns);
GTRecord newRecord = new GTRecord(newGridTable.getInfo());
int count = 0;
- try {
+ try (GTAggregateScanner scanner =
prepareGTAggregationScanner(gridTable, parentId, cuboidId, aggregationColumns,
measureColumns);
+ GTBuilder builder = newGridTable.rebuild()) {
for (GTRecord record : scanner) {
count++;
for (int i = 0; i < allNeededColumns.trueBitCount(); i++) {
@@ -462,10 +461,6 @@ public class InMemCubeBuilder extends
AbstractInMemCubeBuilder {
}
builder.write(newRecord);
}
-
- } finally {
- scanner.close();
- builder.close();
}
long timeSpent = System.currentTimeMillis() - startTime;
diff --git
a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateTransformScanner.java
b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateTransformScanner.java
index 0f805e8..c59ae36 100644
---
a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateTransformScanner.java
+++
b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateTransformScanner.java
@@ -38,6 +38,7 @@ public class GTAggregateTransformScanner implements
IGTScanner {
private final GTScanRequest req;
private final GTTwoLayerAggregateParam twoLayerAggParam;
private long inputRowCount = 0L;
+ private Iterator<GTRecord> iterator = null;
public GTAggregateTransformScanner(IGTScanner inputScanner, GTScanRequest
req) {
this.inputScanner = inputScanner;
@@ -57,9 +58,13 @@ public class GTAggregateTransformScanner implements
IGTScanner {
@Override
public Iterator<GTRecord> iterator() {
- return twoLayerAggParam.satisfyPrefix(req.getDimensions())
- ? new
FragmentTransformGTRecordIterator(inputScanner.iterator())
- : new NormalTransformGTRecordIterator(inputScanner.iterator());
+ if (iterator == null) {
+ iterator = twoLayerAggParam.satisfyPrefix(req.getDimensions())
+ ? new
FragmentTransformGTRecordIterator(inputScanner.iterator())
+ : new
NormalTransformGTRecordIterator(inputScanner.iterator());
+ }
+
+ return iterator;
}
public long getInputRowCount() {
@@ -75,6 +80,7 @@ public class GTAggregateTransformScanner implements
IGTScanner {
private class FragmentTransformGTRecordIterator extends
TransformGTRecordIterator {
private final PrefixFragmentIterator fragmentIterator;
+ private GTAggregateScanner aggregateScanner;
private Iterator<GTRecord> transformedFragment = null;
FragmentTransformGTRecordIterator(Iterator<GTRecord> input) {
@@ -88,6 +94,12 @@ public class GTAggregateTransformScanner implements
IGTScanner {
}
if (!fragmentIterator.hasNext()) {
+ // close resource
+ try {
+ aggregateScanner.close();
+ } catch (IOException e) {
+ // do nothing
+ }
return false;
}
@@ -106,7 +118,8 @@ public class GTAggregateTransformScanner implements
IGTScanner {
return fragmentIterator.next();
}
};
- transformedFragment = new GTAggregateScanner(fragmentScanner,
innerReq).iterator();
+ aggregateScanner = new GTAggregateScanner(fragmentScanner,
innerReq);
+ transformedFragment = aggregateScanner.iterator();
return hasNext();
}
@@ -124,6 +137,7 @@ public class GTAggregateTransformScanner implements
IGTScanner {
private class NormalTransformGTRecordIterator extends
TransformGTRecordIterator {
private final Iterator<GTRecord> aggRecordIterator;
+ private final GTAggregateScanner aggregateScanner;
NormalTransformGTRecordIterator(final Iterator<GTRecord> input) {
IGTScanner gtScanner = new IGTScanner() {
@@ -134,6 +148,7 @@ public class GTAggregateTransformScanner implements
IGTScanner {
@Override
public void close() throws IOException {
+ // do nothing
}
@Override
@@ -141,13 +156,21 @@ public class GTAggregateTransformScanner implements
IGTScanner {
return input;
}
};
-
- aggRecordIterator = new GTAggregateScanner(gtScanner,
innerReq).iterator();
+ aggregateScanner = new GTAggregateScanner(gtScanner, innerReq);
+ aggRecordIterator = aggregateScanner.iterator();
}
@Override
public boolean hasNext() {
- return aggRecordIterator.hasNext();
+ boolean hasNext = aggRecordIterator.hasNext();
+ if (!hasNext) {
+ try {
+ aggregateScanner.close();
+ } catch (IOException e) {
+ // do nothing
+ }
+ }
+ return hasNext;
}
@Override
diff --git
a/core-cube/src/main/java/org/apache/kylin/gridtable/GTTwoLayerAggregateScanner.java
b/core-cube/src/main/java/org/apache/kylin/gridtable/GTTwoLayerAggregateScanner.java
index 5df96e2..58b5c1e 100644
---
a/core-cube/src/main/java/org/apache/kylin/gridtable/GTTwoLayerAggregateScanner.java
+++
b/core-cube/src/main/java/org/apache/kylin/gridtable/GTTwoLayerAggregateScanner.java
@@ -46,6 +46,8 @@ public class GTTwoLayerAggregateScanner implements IGTScanner
{
@Override
public void close() throws IOException {
inputScanner.close();
+ secondLayerInputScanner.close();
+ outputScanner.close();
}
@Override
diff --git
a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java
b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java
index f457ea6..bd147bf 100644
---
a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java
+++
b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java
@@ -102,11 +102,12 @@ public class MemDiskStoreTest extends
LocalFileMetadataTestCase {
}
builder.close();
- IGTScanner scanner = table.scan(new
GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest());
- int i = 0;
- for (GTRecord r : scanner) {
- assertEquals(data.get(i++), r);
+ try (IGTScanner scanner = table.scan(new
GTScanRequestBuilder().setInfo(info).setRanges(null)
+
.setDimensions(null).setFilterPushDown(null).createGTScanRequest())) {
+ int i = 0;
+ for (GTRecord r : scanner) {
+ assertEquals(data.get(i++), r);
+ }
}
- scanner.close();
}
}
diff --git
a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
index 14a25c5..6f3de1c 100644
---
a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
+++
b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
@@ -51,7 +51,7 @@ public class SimpleGridTableTest extends
LocalFileMetadataTestCase {
GridTable table = new GridTable(info, store);
GTBuilder builder = rebuild(table);
- IGTScanner scanner = scan(table);
+ scan(table);
}
@Test
@@ -61,7 +61,7 @@ public class SimpleGridTableTest extends
LocalFileMetadataTestCase {
GridTable table = new GridTable(info, store);
GTBuilder builder = rebuild(table);
- IGTScanner scanner = scan(table);
+ scan(table);
}
@Test
@@ -71,7 +71,7 @@ public class SimpleGridTableTest extends
LocalFileMetadataTestCase {
GridTable table = new GridTable(info, store);
GTBuilder builder = rebuild(table);
- IGTScanner scanner = scanAndAggregate(table);
+ scanAndAggregate(table);
}
@Test
@@ -81,55 +81,53 @@ public class SimpleGridTableTest extends
LocalFileMetadataTestCase {
GridTable table = new GridTable(info, store);
rebuildViaAppend(table);
- IGTScanner scanner = scan(table);
+ scan(table);
}
- private IGTScanner scan(GridTable table) throws IOException {
+ private void scan(GridTable table) throws IOException {
GTScanRequest req = new
GTScanRequestBuilder().setInfo(table.getInfo()).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest();
- IGTScanner scanner = table.scan(req);
- for (GTRecord r : scanner) {
- Object[] v = r.getValues();
- assertTrue(((String) v[0]).startsWith("2015-"));
- assertTrue(((String) v[2]).equals("Food"));
- assertTrue(((Long) v[3]).longValue() == 10);
- assertTrue(((BigDecimal) v[4]).doubleValue() == 10.5);
- System.out.println(r);
+ try (IGTScanner scanner = table.scan(req)) {
+ for (GTRecord r : scanner) {
+ Object[] v = r.getValues();
+ assertTrue(((String) v[0]).startsWith("2015-"));
+ assertTrue(((String) v[2]).equals("Food"));
+ assertTrue(((Long) v[3]).longValue() == 10);
+ assertTrue(((BigDecimal) v[4]).doubleValue() == 10.5);
+ System.out.println(r);
+ }
}
- scanner.close();
- return scanner;
}
- private IGTScanner scanAndAggregate(GridTable table) throws IOException {
+ private void scanAndAggregate(GridTable table) throws IOException {
GTScanRequest req = new
GTScanRequestBuilder().setInfo(table.getInfo()).setRanges(null).setDimensions(null).setAggrGroupBy(setOf(0,
2)).setAggrMetrics(setOf(3, 4)).setAggrMetricsFuncs(new String[] { "count",
"sum" }).setFilterPushDown(null).createGTScanRequest();
- IGTScanner scanner = table.scan(req);
- int i = 0;
- for (GTRecord r : scanner) {
- Object[] v = r.getValues();
- switch (i) {
- case 0:
- assertTrue(((Long) v[3]).longValue() == 20);
- assertTrue(((BigDecimal) v[4]).doubleValue() == 21.0);
- break;
- case 1:
- assertTrue(((Long) v[3]).longValue() == 30);
- assertTrue(((BigDecimal) v[4]).doubleValue() == 31.5);
- break;
- case 2:
- assertTrue(((Long) v[3]).longValue() == 40);
- assertTrue(((BigDecimal) v[4]).doubleValue() == 42.0);
- break;
- case 3:
- assertTrue(((Long) v[3]).longValue() == 10);
- assertTrue(((BigDecimal) v[4]).doubleValue() == 10.5);
- break;
- default:
- fail();
+ try (IGTScanner scanner = table.scan(req)) {
+ int i = 0;
+ for (GTRecord r : scanner) {
+ Object[] v = r.getValues();
+ switch (i) {
+ case 0:
+ assertTrue(((Long) v[3]).longValue() == 20);
+ assertTrue(((BigDecimal) v[4]).doubleValue() == 21.0);
+ break;
+ case 1:
+ assertTrue(((Long) v[3]).longValue() == 30);
+ assertTrue(((BigDecimal) v[4]).doubleValue() == 31.5);
+ break;
+ case 2:
+ assertTrue(((Long) v[3]).longValue() == 40);
+ assertTrue(((BigDecimal) v[4]).doubleValue() == 42.0);
+ break;
+ case 3:
+ assertTrue(((Long) v[3]).longValue() == 10);
+ assertTrue(((BigDecimal) v[4]).doubleValue() == 10.5);
+ break;
+ default:
+ fail();
+ }
+ i++;
+ System.out.println(r);
}
- i++;
- System.out.println(r);
}
- scanner.close();
- return scanner;
}
static GTBuilder rebuild(GridTable table) throws IOException {
diff --git
a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
index d813793..ba31e2b 100644
---
a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
+++
b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
@@ -46,9 +46,6 @@ public class GlobalDictionaryBuilder implements
IDictionaryBuilder {
@Override
public void init(DictionaryInfo dictInfo, int baseId, String hdfsDir)
throws IOException {
sourceColumn = dictInfo.getSourceTable() + "_" +
dictInfo.getSourceColumn();
- lock =
KylinConfig.getInstanceFromEnv().getDistributedLockFactory().lockForCurrentThread();
- lock.lock(getLockPath(sourceColumn), Long.MAX_VALUE);
-
int maxEntriesPerSlice =
KylinConfig.getInstanceFromEnv().getAppendDictEntrySize();
if (hdfsDir == null) {
//build in Kylin job server
@@ -56,12 +53,18 @@ public class GlobalDictionaryBuilder implements
IDictionaryBuilder {
}
String baseDir = hdfsDir + "resources/GlobalDict" +
dictInfo.getResourceDir() + "/";
+ lock =
KylinConfig.getInstanceFromEnv().getDistributedLockFactory().lockForCurrentThread();
+ String lockPath = getLockPath(sourceColumn);
try {
+ lock.lock(lockPath, Long.MAX_VALUE);
this.builder = new AppendTrieDictionaryBuilder(baseDir,
maxEntriesPerSlice, true);
} catch (Throwable e) {
- lock.unlock(getLockPath(sourceColumn));
throw new RuntimeException(
String.format(Locale.ROOT, "Failed to create global
dictionary on %s ", sourceColumn), e);
+ } finally {
+ if (lock.isLockedByMe(lockPath)) {
+ lock.unlock(lockPath);
+ }
}
this.baseId = baseId;
}
diff --git
a/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java
b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java
index 335d3c8..3de65ea 100644
---
a/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java
+++
b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java
@@ -38,7 +38,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE,
getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility =
JsonAutoDetect.Visibility.NONE, setterVisibility =
JsonAutoDetect.Visibility.NONE)
public class StreamingConfig extends RootPersistentEntity {
- public static Serializer<StreamingConfig> SERIALIZER = new
JsonSerializer<StreamingConfig>(StreamingConfig.class);
+ private static final Serializer<StreamingConfig> SERIALIZER = new
JsonSerializer<>(StreamingConfig.class);
public static final String STREAMING_TYPE_KAFKA = "kafka";
diff --git
a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
index c49edc7..574bb9f 100644
---
a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
+++
b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
@@ -364,12 +364,12 @@ public class DictGridTableTest extends
LocalFileMetadataTestCase {
long start = System.currentTimeMillis();
GTScanRequest req = new
GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null)
.setFilterPushDown(filter).createGTScanRequest();
- IGTScanner scanner = table.scan(req);
int i = 0;
- for (GTRecord r : scanner) {
- i++;
+ try (IGTScanner scanner = table.scan(req)) {
+ for (GTRecord r : scanner) {
+ i++;
+ }
}
- scanner.close();
long end = System.currentTimeMillis();
System.out.println(
(end - start) + "ms with filter cache enabled=" +
FilterResultCache.DEFAULT_OPTION + ", " + i + " rows");
@@ -555,17 +555,17 @@ public class DictGridTableTest extends
LocalFileMetadataTestCase {
private void doScanAndVerify(GridTable table, GTScanRequest req, String...
verifyRows) throws IOException {
System.out.println(req);
- IGTScanner scanner = table.scan(req);
- int i = 0;
- for (GTRecord r : scanner) {
- System.out.println(r);
- if (verifyRows == null || i >= verifyRows.length) {
- Assert.fail();
+ try (IGTScanner scanner = table.scan(req)) {
+ int i = 0;
+ for (GTRecord r : scanner) {
+ System.out.println(r);
+ if (verifyRows == null || i >= verifyRows.length) {
+ Assert.fail();
+ }
+ assertEquals(verifyRows[i], r.toString());
+ i++;
}
- assertEquals(verifyRows[i], r.toString());
- i++;
}
- scanner.close();
}
public static ByteArray enc(GTInfo info, int col, String value) {
diff --git
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
index 4dd68a5..62fb814 100755
---
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
+++
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
@@ -371,17 +371,17 @@ public class MapReduceExecutable extends
AbstractExecutable {
long lockStartTime = System.currentTimeMillis();
boolean isLockedByTheJob = lock.isLocked(fullLockPath);
- logger.info("cube job {} zk lock is isLockedByTheJob:{}", getId(),
isLockedByTheJob);
- if (!isLockedByTheJob) {//not lock by the job
+ logger.info("cube job {} zk lock is isLockedByTheJob: {}", getId(),
isLockedByTheJob);
+ if (!isLockedByTheJob) { //not lock by the job
while (isLockedByOther) {
- isLockedByOther =
lock.isLocked(getCubeJobLockParentPathName());//other job global lock
+ isLockedByOther =
lock.isLocked(getCubeJobLockParentPathName()); //other job global lock
- if (!isLockedByOther) {//not lock by other job
+ if (!isLockedByOther) { //not lock by other job
isLockedByOther = lock.isLocked(ephemeralLockPath);//check
the ephemeral current lock
logger.info("zookeeper lock path :{}, is locked by other
job result is {}", ephemeralLockPath,
isLockedByOther);
- if (!isLockedByOther) {//the ephemeral lock not lock by
other job
+ if (!isLockedByOther) { //the ephemeral lock not lock by
other job
//try to get ephemeral lock
try {
logger.debug("{} before start to get lock
ephemeralLockPath {}", getId(),
@@ -413,12 +413,12 @@ public class MapReduceExecutable extends
AbstractExecutable {
}
}
}
- isLockedByOther = true;//get lock fail,will try again
+ isLockedByOther = true;//get lock fail, will try again
}
}
// wait 1 min and try again
logger.info(
- "{}, parent lock path({}) is locked by other job
result is {} ,ephemeral lock path :{} is locked by other job result is {},will
try after one minute",
+ "{}, parent lock path({}) is locked by other job
result is {} ,ephemeral lock path: {} is locked by other job result is {}, will
try after one minute",
getId(), getCubeJobLockParentPathName(),
isLockedByOther, ephemeralLockPath, isLockedByOther);
Thread.sleep(60000);
}
diff --git
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/ColumnarSplitDataReader.java
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/ColumnarSplitDataReader.java
index d426cfc..9d33119 100644
---
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/ColumnarSplitDataReader.java
+++
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/ColumnarSplitDataReader.java
@@ -52,7 +52,7 @@ public class ColumnarSplitDataReader extends
ColumnarSplitReader {
init(inputSplit, context);
}
- public void init(InputSplit split, TaskAttemptContext context) throws
IOException, InterruptedException {
+ public void init(InputSplit split, TaskAttemptContext context) throws
IOException {
baseCuboid = Cuboid.getBaseCuboid(cubeDesc);
rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment,
baseCuboid);
diff --git
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/DictsReader.java
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/DictsReader.java
index b543c8b..7aa9d19 100644
---
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/DictsReader.java
+++
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/DictsReader.java
@@ -36,43 +36,33 @@ import
org.apache.kylin.shaded.com.google.common.collect.ImmutableMap;
public class DictsReader extends ColumnarFilesReader {
private static final Logger logger =
LoggerFactory.getLogger(DictsReader.class);
- private FSDataInputStream metaInputStream;
- private FSDataInputStream dataInputStream;
public DictsReader(Path path, FileSystem fileSystem) throws IOException {
super(fileSystem, path);
}
public ImmutableMap<String, Dictionary> readDicts() throws IOException {
- metaInputStream = fs.open(metaFilePath);
- FragmentMetaInfo fragmentMetaInfo =
JsonUtil.readValue(metaInputStream, FragmentMetaInfo.class);
- List<DimDictionaryMetaInfo> dimDictMetaInfos =
fragmentMetaInfo.getDimDictionaryMetaInfos();
- ImmutableMap.Builder<String, Dictionary> builder =
ImmutableMap.<String, Dictionary> builder();
- dataInputStream = fs.open(dataFilePath);
- Dictionary dict;
- String colName;
- logger.info("Reading dictionary from {}", dataFilePath.getName());
- for (DimDictionaryMetaInfo dimDictMetaInfo : dimDictMetaInfos) {
- dataInputStream.seek(dimDictMetaInfo.getStartOffset());
- dict = DictionarySerializer.deserialize(dataInputStream);
- colName = dimDictMetaInfo.getDimName();
- logger.info("Add dict for {}", colName);
- builder.put(colName, dict);
+ try (FSDataInputStream metaInputStream = fs.open(metaFilePath);
+ FSDataInputStream dataInputStream = fs.open(dataFilePath)) {
+ FragmentMetaInfo fragmentMetaInfo =
JsonUtil.readValue(metaInputStream, FragmentMetaInfo.class);
+ List<DimDictionaryMetaInfo> dimDictMetaInfos =
fragmentMetaInfo.getDimDictionaryMetaInfos();
+ ImmutableMap.Builder<String, Dictionary> builder =
ImmutableMap.builder();
+ Dictionary dict;
+ String colName;
+ logger.info("Reading dictionary from {}", dataFilePath.getName());
+ for (DimDictionaryMetaInfo dimDictMetaInfo : dimDictMetaInfos) {
+ dataInputStream.seek(dimDictMetaInfo.getStartOffset());
+ dict = DictionarySerializer.deserialize(dataInputStream);
+ colName = dimDictMetaInfo.getDimName();
+ logger.info("Add dict for {}", colName);
+ builder.put(colName, dict);
+ }
+ return builder.build();
}
- return builder.build();
}
@Override
public void close() throws IOException{
- try {
- if (metaInputStream != null) {
- metaInputStream.close();
- }
- if (dataInputStream != null) {
- dataInputStream.close();
- }
- } catch (IOException e) {
- logger.error("close file error", e);
- }
+
}
}
diff --git
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/RowRecordReader.java
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/RowRecordReader.java
index 2206fd7..1e610bd 100644
---
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/RowRecordReader.java
+++
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/RowRecordReader.java
@@ -82,24 +82,25 @@ public class RowRecordReader extends ColumnarFilesReader {
}
public void initReaders() throws IOException {
- FSDataInputStream in = fs.open(metaFilePath);
- FragmentMetaInfo fragmentMetaInfo = JsonUtil.readValue(in,
FragmentMetaInfo.class);
- CuboidMetaInfo basicCuboidMetaInfo =
fragmentMetaInfo.getBasicCuboidMetaInfo();
- FSDataInputStream dictInputStream = fs.open(dataFilePath);
+ List<DimensionMetaInfo> allDimensions;
+ CuboidMetaInfo basicCuboidMetaInfo;
+ Map<String, DimensionEncoding> dimensionEncodingMap;
+ try (FSDataInputStream in = fs.open(metaFilePath)) {
+ FragmentMetaInfo fragmentMetaInfo = JsonUtil.readValue(in,
FragmentMetaInfo.class);
+ basicCuboidMetaInfo = fragmentMetaInfo.getBasicCuboidMetaInfo();
+ allDimensions = basicCuboidMetaInfo.getDimensionsInfo();
+ dimensionEncodingMap = getDimensionEncodings(fragmentMetaInfo,
allDimensions, dataFilePath);
+ }
- List<DimensionMetaInfo> allDimensions =
basicCuboidMetaInfo.getDimensionsInfo();
- Map<String, DimensionEncoding> dimensionEncodingMap =
getDimensionEncodings(fragmentMetaInfo, allDimensions,
- dictInputStream);
dimensionColumnReaders = Lists.newArrayList();
dimensionColumnReaderItrs = Lists.newArrayList();
dimensionEncodings = Lists.newArrayList();
for (DimensionMetaInfo dimensionMetaInfo : allDimensions) {
- FSDataInputStream dimInputStream = fs.open(dataFilePath);
String dimName = dimensionMetaInfo.getName();
DimensionEncoding dimEncoding = dimensionEncodingMap.get(dimName);
ColumnarStoreDimDesc dimDesc = new
ColumnarStoreDimDesc(dimEncoding.getLengthOfEncoding(),
dimensionMetaInfo.getCompressionType());
- ColumnDataReader dimDataReader =
dimDesc.getDimReaderFromFSInput(dimInputStream,
+ ColumnDataReader dimDataReader =
dimDesc.getDimReaderFromFSInput(fs, dataFilePath,
dimensionMetaInfo.getStartOffset(),
dimensionMetaInfo.getDataLength(),
(int) basicCuboidMetaInfo.getNumberOfRows());
dimensionColumnReaders.add(dimDataReader);
@@ -112,13 +113,13 @@ public class RowRecordReader extends ColumnarFilesReader {
metricsColumnReaderItrs = Lists.newArrayList();
metricsDataTransformers = Lists.newArrayList();
for (MetricMetaInfo metricMetaInfo :
basicCuboidMetaInfo.getMetricsInfo()) {
- FSDataInputStream metricsInputStream = fs.open(dataFilePath);
+
MeasureDesc measure = findMeasure(metricMetaInfo.getName());
DataType metricsDataType =
measure.getFunction().getReturnDataType();
ColumnarMetricsEncoding metricsEncoding =
ColumnarMetricsEncodingFactory.create(metricsDataType);
ColumnarStoreMetricsDesc metricsDesc = new
ColumnarStoreMetricsDesc(metricsEncoding,
metricMetaInfo.getCompressionType());
- ColumnDataReader metricsDataReader =
metricsDesc.getMetricsReaderFromFSInput(metricsInputStream,
+ ColumnDataReader metricsDataReader =
metricsDesc.getMetricsReaderFromFSInput(fs, dataFilePath,
metricMetaInfo.getStartOffset(),
metricMetaInfo.getMetricLength(),
(int) basicCuboidMetaInfo.getNumberOfRows());
metricsColumnReaders.add(metricsDataReader);
@@ -140,8 +141,11 @@ public class RowRecordReader extends ColumnarFilesReader {
}
private Map<String, DimensionEncoding>
getDimensionEncodings(FragmentMetaInfo fragmentMetaInfo,
- List<DimensionMetaInfo> allDimensions, FSDataInputStream
dictInputStream) throws IOException {
- Map<String, Dictionary> dictionaryMap =
readAllDimensionsDictionary(fragmentMetaInfo, dictInputStream);
+ List<DimensionMetaInfo> allDimensions, Path dataPath) throws
IOException {
+ Map<String, Dictionary> dictionaryMap;
+ try (FSDataInputStream dictInputStream = fs.open(dataPath)) {
+ dictionaryMap = readAllDimensionsDictionary(fragmentMetaInfo,
dictInputStream);
+ }
Map<String, DimensionEncoding> result = Maps.newHashMap();
for (DimensionMetaInfo dimension : allDimensions) {
@@ -152,7 +156,7 @@ public class RowRecordReader extends ColumnarFilesReader {
Dictionary<String> dict =
dictionaryMap.get(dimension.getName());
if (dict == null) {
logger.error("No dictionary found for dict-encoding column
" + col);
- throw new RuntimeException("No dictionary found for
dict-encoding column " + col);
+ throw new IllegalStateException("No dictionary found for
dict-encoding column " + col);
} else {
result.put(dimension.getName(), new
DictionaryDimEnc(dict));
}
@@ -169,14 +173,11 @@ public class RowRecordReader extends ColumnarFilesReader {
public Map<String, Dictionary>
readAllDimensionsDictionary(FragmentMetaInfo fragmentMetaInfo,
FSDataInputStream dataInputStream) throws IOException {
List<DimDictionaryMetaInfo> dimDictMetaInfos =
fragmentMetaInfo.getDimDictionaryMetaInfos();
- ImmutableMap.Builder<String, Dictionary> builder =
ImmutableMap.<String, Dictionary> builder();
- Dictionary dict;
- String colName;
+ ImmutableMap.Builder<String, Dictionary> builder =
ImmutableMap.builder();
for (DimDictionaryMetaInfo dimDictMetaInfo : dimDictMetaInfos) {
dataInputStream.seek(dimDictMetaInfo.getStartOffset());
- dict = DictionarySerializer.deserialize(dataInputStream);
- colName = dimDictMetaInfo.getDimName();
- builder.put(colName, dict);
+ Dictionary dict =
DictionarySerializer.deserialize(dataInputStream);
+ builder.put(dimDictMetaInfo.getDimName(), dict);
}
return builder.build();
}
diff --git
a/kylin-it/src/test/java/org/apache/kylin/realtime/BuildCubeWithStreamV2.java
b/kylin-it/src/test/java/org/apache/kylin/realtime/BuildCubeWithStreamV2.java
index 781c62a..fb85b26 100644
---
a/kylin-it/src/test/java/org/apache/kylin/realtime/BuildCubeWithStreamV2.java
+++
b/kylin-it/src/test/java/org/apache/kylin/realtime/BuildCubeWithStreamV2.java
@@ -249,7 +249,7 @@ public class BuildCubeWithStreamV2 extends KylinTestBase {
}
if (consumeDataDone == false) {
- throw new IllegalStateException("Exec timeout, data not be
comsumed completely"); // ensure all messages have been comsumed.
+ throw new IllegalStateException("Exec timeout, data not be
consumed completely"); // ensure all messages have been comsumed.
}
waittime = 0;
diff --git
a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
index 423a4f0..2f3ad4c 100755
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
@@ -67,9 +67,9 @@ public class OLAPContext {
public static final String PRM_ACCEPT_PARTIAL_RESULT =
"AcceptPartialResult";
public static final String PRM_USER_AUTHEN_INFO = "UserAuthenInfo";
- static final InternalThreadLocal<Map<String, String>> _localPrarameters =
new InternalThreadLocal<Map<String, String>>();
+ static final InternalThreadLocal<Map<String, String>> _localPrarameters =
new InternalThreadLocal<>();
- static final InternalThreadLocal<Map<Integer, OLAPContext>> _localContexts
= new InternalThreadLocal<Map<Integer, OLAPContext>>();
+ static final InternalThreadLocal<Map<Integer, OLAPContext>> _localContexts
= new InternalThreadLocal<>();
public static void setParameters(Map<String, String> parameters) {
_localPrarameters.set(parameters);
@@ -81,7 +81,7 @@ public class OLAPContext {
public static void registerContext(OLAPContext ctx) {
if (_localContexts.get() == null) {
- Map<Integer, OLAPContext> contextMap = new HashMap<Integer,
OLAPContext>();
+ Map<Integer, OLAPContext> contextMap = new HashMap<>();
_localContexts.set(contextMap);
}
_localContexts.get().put(ctx.id, ctx);
@@ -147,7 +147,7 @@ public class OLAPContext {
public Set<TblColRef> allColumns = new HashSet<>();
public List<TblColRef> groupByColumns = new ArrayList<>();
- public Set<TblColRef> subqueryJoinParticipants = new
HashSet<TblColRef>();//subqueryJoinParticipants will be added to
groupByColumns(only when other group by co-exists) and allColumns
+ public Set<TblColRef> subqueryJoinParticipants = new
HashSet<>();//subqueryJoinParticipants will be added to groupByColumns(only
when other group by co-exists) and allColumns
public Set<TblColRef> metricsColumns = new HashSet<>();
public List<FunctionDesc> aggregations = new ArrayList<>(); // storage
level measure type, on top of which various sql aggr function may apply
public List<TblColRef> aggrOutCols = new ArrayList<>(); // aggregation
output (inner) columns
@@ -175,7 +175,7 @@ public class OLAPContext {
public OLAPAuthentication olapAuthen = new OLAPAuthentication();
public boolean isSimpleQuery() {
- return (joins.size() == 0) && (groupByColumns.size() == 0) &&
(aggregations.size() == 0);
+ return (joins.isEmpty()) && (groupByColumns.isEmpty()) &&
(aggregations.isEmpty());
}
SQLDigest sqlDigest;
@@ -343,7 +343,7 @@ public class OLAPContext {
//
============================================================================
public interface IAccessController {
- public void check(List<OLAPContext> contexts, KylinConfig config)
throws IllegalStateException;
+ void check(List<OLAPContext> contexts, KylinConfig config);
}
}
diff --git
a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitServiceTest.java
b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitServiceTest.java
index 6650e6d..dfae455 100644
---
a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitServiceTest.java
+++
b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitServiceTest.java
@@ -137,18 +137,19 @@ public class CubeVisitServiceTest extends
LocalFileMetadataTestCase {
gtInfo = newInfo();
GridTable gridTable = newTable(gtInfo);
- IGTScanner scanner = gridTable.scan(new
GTScanRequestBuilder().setInfo(gtInfo).setRanges(null)
-
.setDimensions(null).setFilterPushDown(null).createGTScanRequest());
- for (GTRecord record : scanner) {
- byte[] value =
record.exportColumns(gtInfo.getPrimaryKey()).toBytes();
- byte[] key = new byte[RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN +
value.length];
- System.arraycopy(Bytes.toBytes(baseCuboid), 0, key,
RowConstants.ROWKEY_SHARDID_LEN,
- RowConstants.ROWKEY_CUBOIDID_LEN);
- System.arraycopy(value, 0, key,
RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN, value.length);
- Put put = new Put(key);
- put.addColumn(FAM[0], COL_M,
record.exportColumns(gtInfo.getColumnBlock(1)).toBytes());
- put.addColumn(FAM[1], COL_M,
record.exportColumns(gtInfo.getColumnBlock(2)).toBytes());
- region.put(put);
+ try (IGTScanner scanner = gridTable.scan(new
GTScanRequestBuilder().setInfo(gtInfo).setRanges(null)
+
.setDimensions(null).setFilterPushDown(null).createGTScanRequest())) {
+ for (GTRecord record : scanner) {
+ byte[] value =
record.exportColumns(gtInfo.getPrimaryKey()).toBytes();
+ byte[] key = new byte[RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN
+ value.length];
+ System.arraycopy(Bytes.toBytes(baseCuboid), 0, key,
RowConstants.ROWKEY_SHARDID_LEN,
+ RowConstants.ROWKEY_CUBOIDID_LEN);
+ System.arraycopy(value, 0, key,
RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN, value.length);
+ Put put = new Put(key);
+ put.addColumn(FAM[0], COL_M,
record.exportColumns(gtInfo.getColumnBlock(1)).toBytes());
+ put.addColumn(FAM[1], COL_M,
record.exportColumns(gtInfo.getColumnBlock(2)).toBytes());
+ region.put(put);
+ }
}
}
diff --git
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarMemoryStorePersister.java
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarMemoryStorePersister.java
index 8114360..02a6ad6 100644
---
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarMemoryStorePersister.java
+++
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarMemoryStorePersister.java
@@ -29,7 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentMap;
import javax.annotation.Nullable;
@@ -125,7 +125,7 @@ public class ColumnarMemoryStorePersister {
FileOutputStream fragmentOutputStream =
FileUtils.openOutputStream(fragment.getDataFile());
try (CountingOutputStream fragmentOut = new CountingOutputStream(new
BufferedOutputStream(fragmentOutputStream))) {
- ConcurrentSkipListMap<String[], MeasureAggregator[]>
basicCuboidData = memoryStore.getBasicCuboidData();
+ ConcurrentMap<String[], MeasureAggregator[]> basicCuboidData =
memoryStore.getBasicCuboidData();
List<List<Object>> basicCuboidColumnarValues =
transformToColumnar(baseCuboidId, dimensions.length,
basicCuboidData);
// persist dictionaries
@@ -139,13 +139,13 @@ public class ColumnarMemoryStorePersister {
long totalRowCnt = basicCuboidMeta.getNumberOfRows();
// persist additional cuboids
- Map<CuboidInfo, ConcurrentSkipListMap<String[],
MeasureAggregator[]>> additionalCuboidsData = memoryStore
+ Map<CuboidInfo, ConcurrentMap<String[], MeasureAggregator[]>>
additionalCuboidsData = memoryStore
.getAdditionalCuboidsData();
if (additionalCuboidsData != null && additionalCuboidsData.size()
> 0) {
- for (Entry<CuboidInfo, ConcurrentSkipListMap<String[],
MeasureAggregator[]>> cuboidDataEntry : additionalCuboidsData
+ for (Entry<CuboidInfo, ConcurrentMap<String[],
MeasureAggregator[]>> cuboidDataEntry : additionalCuboidsData
.entrySet()) {
CuboidInfo cuboidInfo = cuboidDataEntry.getKey();
- ConcurrentSkipListMap<String[], MeasureAggregator[]>
cuboidData = cuboidDataEntry.getValue();
+ ConcurrentMap<String[], MeasureAggregator[]> cuboidData =
cuboidDataEntry.getValue();
List<List<Object>> cuboidColumnarValues =
transformToColumnar(cuboidInfo.getCuboidID(),
cuboidInfo.getDimCount(), cuboidData);
CuboidMetaInfo cuboidMeta =
persistCuboidData(cuboidInfo.getCuboidID(), cuboidInfo.getDimensions(),
@@ -171,7 +171,7 @@ public class ColumnarMemoryStorePersister {
* @return
*/
private List<List<Object>> transformToColumnar(long cuboidId, int dimCnt,
- ConcurrentSkipListMap<String[], MeasureAggregator[]> aggBufMap) {
+ ConcurrentMap<String[], MeasureAggregator[]> aggBufMap) {
Stopwatch stopwatch = Stopwatch.createUnstarted();
stopwatch.start();
int columnsNum = dimCnt + measures.length;
diff --git
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreDimDesc.java
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreDimDesc.java
index 971d137..b762c7e 100644
---
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreDimDesc.java
+++
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreDimDesc.java
@@ -22,7 +22,8 @@ import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
-import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.dimension.DimensionEncoding;
import org.apache.kylin.stream.core.storage.columnar.compress.Compression;
@@ -81,14 +82,14 @@ public class ColumnarStoreDimDesc {
return new NoCompressedColumnReader(dataBuffer, columnDataStartOffset,
columnDataLength / rowCount, rowCount);
}
- public ColumnDataReader getDimReaderFromFSInput(FSDataInputStream
inputStream, int columnDataStartOffset,
- int columnDataLength, int rowCount) throws IOException {
+ public ColumnDataReader getDimReaderFromFSInput(FileSystem fs, Path file,
int columnDataStartOffset,
+ int columnDataLength, int
rowCount) throws IOException {
if (compression == Compression.LZ4) {
- return new FSInputLZ4CompressedColumnReader(inputStream,
columnDataStartOffset, columnDataLength, rowCount);
+ return new FSInputLZ4CompressedColumnReader(fs, file,
columnDataStartOffset, columnDataLength, rowCount);
} else if (compression == Compression.RUN_LENGTH) {
- return new FSInputRLECompressedColumnReader(inputStream,
columnDataStartOffset, columnDataLength, rowCount);
+ return new FSInputRLECompressedColumnReader(fs, file,
columnDataStartOffset, columnDataLength, rowCount);
}
- return new FSInputNoCompressedColumnReader(inputStream,
columnDataStartOffset, columnDataLength / rowCount,
+ return new FSInputNoCompressedColumnReader(fs, file,
columnDataStartOffset, columnDataLength / rowCount,
rowCount);
}
}
diff --git
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreMetricsDesc.java
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreMetricsDesc.java
index eca5ae7..a3f8de9 100644
---
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreMetricsDesc.java
+++
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreMetricsDesc.java
@@ -23,7 +23,8 @@ import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
-import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.kylin.stream.core.storage.columnar.compress.Compression;
import
org.apache.kylin.stream.core.storage.columnar.compress.FSInputLZ4CompressedColumnReader;
import
org.apache.kylin.stream.core.storage.columnar.compress.FSInputNoCompressedColumnReader;
@@ -74,16 +75,16 @@ public class ColumnarStoreMetricsDesc {
return new GeneralColumnDataReader(dataBuffer, columnDataStartOffset,
columnDataLength);
}
- public ColumnDataReader getMetricsReaderFromFSInput(FSDataInputStream
inputStream, int columnDataStartOffset,
- int columnDataLength, int rowCount) throws IOException {
+ public ColumnDataReader getMetricsReaderFromFSInput(FileSystem fs, Path
filePath, int columnDataStartOffset,
+ int columnDataLength,
int rowCount) throws IOException {
if (Compression.LZ4 == compression && fixLen != -1) {
- return new FSInputLZ4CompressedColumnReader(inputStream,
columnDataStartOffset, columnDataLength, rowCount);
+ return new FSInputLZ4CompressedColumnReader(fs, filePath,
columnDataStartOffset, columnDataLength, rowCount);
}
if (fixLen != -1) {
- return new FSInputNoCompressedColumnReader(inputStream,
columnDataStartOffset, columnDataLength / rowCount,
+ return new FSInputNoCompressedColumnReader(fs, filePath,
columnDataStartOffset, columnDataLength / rowCount,
rowCount);
}
- return new FSInputGeneralColumnDataReader(inputStream,
columnDataStartOffset, columnDataLength);
+ return new FSInputGeneralColumnDataReader(fs, filePath,
columnDataStartOffset, columnDataLength);
}
}
diff --git
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FSInputGeneralColumnDataReader.java
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FSInputGeneralColumnDataReader.java
index 5282533..363886b 100644
---
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FSInputGeneralColumnDataReader.java
+++
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FSInputGeneralColumnDataReader.java
@@ -23,14 +23,16 @@ import java.util.Iterator;
import java.util.NoSuchElementException;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
public class FSInputGeneralColumnDataReader implements ColumnDataReader {
private FSDataInputStream fsInputStream;
private int numOfVals;
- public FSInputGeneralColumnDataReader(FSDataInputStream fsInputStream, int
dataStartOffset, int dataLength)
+ public FSInputGeneralColumnDataReader(FileSystem fs, Path file, int
dataStartOffset, int dataLength)
throws IOException {
- this.fsInputStream = fsInputStream;
+ this.fsInputStream = fs.open(file);
fsInputStream.seek(dataStartOffset + dataLength - 4L);
this.numOfVals = fsInputStream.readInt();
fsInputStream.seek(dataStartOffset);
diff --git
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentCuboidReader.java
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentCuboidReader.java
index 2b5621b..432cbbf 100644
---
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentCuboidReader.java
+++
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentCuboidReader.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.dimension.DimensionEncoding;
@@ -39,14 +40,11 @@ public class FragmentCuboidReader implements
Iterable<RawRecord> {
private long readRowCount = 0;
private int dimCnt;
private int metricCnt;
-
- private CubeDesc cubeDesc;
private ColumnDataReader[] dimensionDataReaders;
private ColumnDataReader[] metricDataReaders;
public FragmentCuboidReader(CubeDesc cubeDesc, FragmentData fragmentData,
CuboidMetaInfo cuboidMetaInfo,
TblColRef[] dimensions, MeasureDesc[] measures,
DimensionEncoding[] dimEncodings) {
- this.cubeDesc = cubeDesc;
this.dimCnt = dimensions.length;
this.metricCnt = measures.length;
this.dimensionDataReaders = new ColumnDataReader[dimCnt];
@@ -127,14 +125,14 @@ public class FragmentCuboidReader implements
Iterable<RawRecord> {
return new Iterator<RawRecord>() {
@Override
public boolean hasNext() {
- if (readRowCount >= rowCount) {
- return false;
- }
- return true;
+ return readRowCount < rowCount;
}
@Override
public RawRecord next() {
+ if (!hasNext())
+ throw new NoSuchElementException();
+
for (int i = 0; i < dimensionDataReaders.length; i++) {
oneRawRecord.setDimension(i, dimValItr[i].next());
}
diff --git
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/SegmentMemoryStore.java
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/SegmentMemoryStore.java
index 2bf9127..24c107d 100644
---
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/SegmentMemoryStore.java
+++
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/SegmentMemoryStore.java
@@ -25,8 +25,10 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -64,8 +66,8 @@ public class SegmentMemoryStore implements
IStreamingGTSearcher {
protected final ParsedStreamingCubeInfo parsedStreamingCubeInfo;
protected final String segmentName;
- private volatile Map<CuboidInfo, ConcurrentSkipListMap<String[],
MeasureAggregator[]>> cuboidsAggBufMap;
- private volatile ConcurrentSkipListMap<String[], MeasureAggregator[]>
basicCuboidAggBufMap;
+ private volatile Map<CuboidInfo, ConcurrentMap<String[],
MeasureAggregator[]>> cuboidsAggBufMap;
+ private volatile ConcurrentMap<String[], MeasureAggregator[]>
basicCuboidAggBufMap;
private volatile AtomicInteger rowCount = new AtomicInteger();
private volatile AtomicInteger originRowCount = new AtomicInteger();
@@ -84,10 +86,10 @@ public class SegmentMemoryStore implements
IStreamingGTSearcher {
this.basicCuboidAggBufMap = new
ConcurrentSkipListMap<>(StringArrayComparator.INSTANCE);
List<CuboidInfo> additionalCuboids =
parsedStreamingCubeInfo.getAdditionalCuboidsToBuild();
- if (additionalCuboids != null && additionalCuboids.size() > 0) {
+ if (additionalCuboids != null && !additionalCuboids.isEmpty()) {
this.cuboidsAggBufMap = new
ConcurrentHashMap<>(additionalCuboids.size());
for (CuboidInfo cuboidInfo : additionalCuboids) {
- cuboidsAggBufMap.put(cuboidInfo, new
ConcurrentSkipListMap<String[], MeasureAggregator[]>(
+ cuboidsAggBufMap.put(cuboidInfo, new ConcurrentSkipListMap<>(
StringArrayComparator.INSTANCE));
}
}
@@ -107,10 +109,10 @@ public class SegmentMemoryStore implements
IStreamingGTSearcher {
Object[] metricsValues = buildValue(row);
aggregate(basicCuboidAggBufMap, basicCuboidDimensions, metricsValues);
if (cuboidsAggBufMap != null) {
- for (Entry<CuboidInfo, ConcurrentSkipListMap<String[],
MeasureAggregator[]>> cuboidAggEntry : cuboidsAggBufMap
+ for (Entry<CuboidInfo, ConcurrentMap<String[],
MeasureAggregator[]>> cuboidAggEntry : cuboidsAggBufMap
.entrySet()) {
CuboidInfo cuboidInfo = cuboidAggEntry.getKey();
- ConcurrentSkipListMap<String[], MeasureAggregator[]>
cuboidAggMap = cuboidAggEntry.getValue();
+ ConcurrentMap<String[], MeasureAggregator[]> cuboidAggMap =
cuboidAggEntry.getValue();
String[] cuboidDimensions = buildCuboidKey(cuboidInfo, row);
aggregate(cuboidAggMap, cuboidDimensions, metricsValues);
}
@@ -172,7 +174,7 @@ public class SegmentMemoryStore implements
IStreamingGTSearcher {
}
@SuppressWarnings("unchecked")
- private void aggregate(ConcurrentSkipListMap<String[],
MeasureAggregator[]> cuboidAggBufMap, String[] dimensions,
+ private void aggregate(ConcurrentMap<String[], MeasureAggregator[]>
cuboidAggBufMap, String[] dimensions,
Object[] metricsValues) {
MeasureAggregator[] aggrs = cuboidAggBufMap.get(dimensions);
if (aggrs != null) {
@@ -217,20 +219,20 @@ public class SegmentMemoryStore implements
IStreamingGTSearcher {
return originRowCount.get();
}
- public ConcurrentSkipListMap<String[], MeasureAggregator[]>
getBasicCuboidData() {
+ public ConcurrentMap<String[], MeasureAggregator[]> getBasicCuboidData() {
return basicCuboidAggBufMap;
}
- public Map<CuboidInfo, ConcurrentSkipListMap<String[],
MeasureAggregator[]>> getAdditionalCuboidsData() {
+ public Map<CuboidInfo, ConcurrentMap<String[], MeasureAggregator[]>>
getAdditionalCuboidsData() {
return cuboidsAggBufMap;
}
- public ConcurrentSkipListMap<String[], MeasureAggregator[]>
getCuboidData(long cuboidID) {
+ public ConcurrentMap<String[], MeasureAggregator[]> getCuboidData(long
cuboidID) {
if (cuboidID == parsedStreamingCubeInfo.basicCuboid.getId()) {
return basicCuboidAggBufMap;
} else {
CuboidInfo cuboidInfo = new CuboidInfo(cuboidID);
- ConcurrentSkipListMap<String[], MeasureAggregator[]> result =
cuboidsAggBufMap.get(cuboidInfo);
+ ConcurrentMap<String[], MeasureAggregator[]> result =
cuboidsAggBufMap.get(cuboidInfo);
if (result != null) {
return result;
}
@@ -357,6 +359,9 @@ public class SegmentMemoryStore implements
IStreamingGTSearcher {
@Override
public Record next() {
+ if (nextEntry == null)
+ throw new NoSuchElementException();
+
try {
String[] allDimensions = nextEntry.getKey();
MeasureAggregator<?>[] allMetrics =
nextEntry.getValue();
@@ -389,8 +394,6 @@ public class SegmentMemoryStore implements
IStreamingGTSearcher {
System.arraycopy(aggrResult, 0,
oneRecord.getMetrics(), 0, aggrResult.length);
count++;
return oneRecord;
- } catch (RuntimeException exception) {
- throw exception;
} finally {
nextEntry = null;
}
diff --git
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/FSInputLZ4CompressedColumnReader.java
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/FSInputLZ4CompressedColumnReader.java
index 0de561e..a00134f 100644
---
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/FSInputLZ4CompressedColumnReader.java
+++
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/FSInputLZ4CompressedColumnReader.java
@@ -24,6 +24,8 @@ import java.util.Iterator;
import java.util.NoSuchElementException;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.kylin.stream.core.storage.columnar.ColumnDataReader;
import net.jpountz.lz4.LZ4Factory;
@@ -40,10 +42,10 @@ public class FSInputLZ4CompressedColumnReader implements
ColumnDataReader {
private LZ4SafeDecompressor deCompressor;
private FSDataInputStream fsInputStream;
- public FSInputLZ4CompressedColumnReader(FSDataInputStream fsInputStream,
int columnDataStartOffset,
- int columnDataLength, int rowCount) throws IOException {
+ public FSInputLZ4CompressedColumnReader(FileSystem fs, Path file, int
columnDataStartOffset,
+ int columnDataLength, int
rowCount) throws IOException {
this.rowCount = rowCount;
- this.fsInputStream = fsInputStream;
+ this.fsInputStream = fs.open(file);
int footStartOffset = columnDataStartOffset + columnDataLength - 8;
fsInputStream.seek(footStartOffset);
this.numValInBlock = fsInputStream.readInt();
@@ -86,11 +88,14 @@ public class FSInputLZ4CompressedColumnReader implements
ColumnDataReader {
@Override
public byte[] next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
if (currBlockNum == -1 || !decompressedBuffer.hasRemaining()) {
try {
loadNextBuffer();
} catch (IOException e) {
- throw new NoSuchElementException("error when read data");
+ throw new NoSuchElementException("error when read data " +
e.getMessage());
}
}
byte[] readBuffer = new byte[valLen];
diff --git
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/FSInputNoCompressedColumnReader.java
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/FSInputNoCompressedColumnReader.java
index c6255cd..4f1ce8d 100644
---
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/FSInputNoCompressedColumnReader.java
+++
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/FSInputNoCompressedColumnReader.java
@@ -20,23 +20,22 @@ package
org.apache.kylin.stream.core.storage.columnar.compress;
import java.io.IOException;
import java.util.Iterator;
+import java.util.NoSuchElementException;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.kylin.stream.core.storage.columnar.ColumnDataReader;
public class FSInputNoCompressedColumnReader implements ColumnDataReader {
private FSDataInputStream fsInputStream;
private byte[] readBuffer;
- private int colDataStartOffset;
- private int colValLength;
private int rowCount;
- public FSInputNoCompressedColumnReader(FSDataInputStream fsInputStream,
int colDataStartOffset, int colValLength,
- int rowCount) throws IOException {
- this.fsInputStream = fsInputStream;
- this.colDataStartOffset = colDataStartOffset;
+ public FSInputNoCompressedColumnReader(FileSystem fs, Path file, int
colDataStartOffset, int colValLength,
+ int rowCount) throws IOException {
+ this.fsInputStream = fs.open(file);
fsInputStream.seek(colDataStartOffset);
- this.colValLength = colValLength;
this.rowCount = rowCount;
this.readBuffer = new byte[colValLength];
}
@@ -65,10 +64,13 @@ public class FSInputNoCompressedColumnReader implements
ColumnDataReader {
@Override
public byte[] next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
try {
fsInputStream.readFully(readBuffer);
} catch (IOException e) {
- throw new RuntimeException("error when read data", e);
+ throw new NoSuchElementException("error when read data " +
e.getMessage());
}
readRowCount++;
return readBuffer;
diff --git
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/FSInputRLECompressedColumnReader.java
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/FSInputRLECompressedColumnReader.java
index fb17ceb..a5c7e16 100644
---
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/FSInputRLECompressedColumnReader.java
+++
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/FSInputRLECompressedColumnReader.java
@@ -21,8 +21,11 @@ package
org.apache.kylin.stream.core.storage.columnar.compress;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
+import java.util.NoSuchElementException;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.kylin.stream.core.storage.columnar.ColumnDataReader;
public class FSInputRLECompressedColumnReader implements ColumnDataReader {
@@ -34,10 +37,10 @@ public class FSInputRLECompressedColumnReader implements
ColumnDataReader {
private int rowCount;
- public FSInputRLECompressedColumnReader(FSDataInputStream fsInputStream,
int columnDataStartOffset,
- int columnDataLength, int rowCount) throws IOException {
+ public FSInputRLECompressedColumnReader(FileSystem fs, Path file, int
columnDataStartOffset,
+ int columnDataLength, int
rowCount) throws IOException {
this.rowCount = rowCount;
- this.fsInputStream = fsInputStream;
+ this.fsInputStream = fs.open(file);
int footStartOffset = columnDataStartOffset + columnDataLength - 8;
fsInputStream.seek(footStartOffset);
this.numValInBlock = fsInputStream.readInt();
@@ -84,6 +87,9 @@ public class FSInputRLECompressedColumnReader implements
ColumnDataReader {
@Override
public byte[] next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
if (readRLEntryValCnt >= currRLEntryValCnt) {
loadNextEntry();
}
diff --git
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/LZ4CompressedColumnReader.java
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/LZ4CompressedColumnReader.java
index b923220..bb09240 100644
---
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/LZ4CompressedColumnReader.java
+++
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/LZ4CompressedColumnReader.java
@@ -26,6 +26,7 @@ import
org.apache.kylin.stream.core.storage.columnar.GeneralColumnDataReader;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
+import java.util.NoSuchElementException;
public class LZ4CompressedColumnReader implements ColumnDataReader {
private int rowCount;
@@ -36,16 +37,13 @@ public class LZ4CompressedColumnReader implements
ColumnDataReader {
private int currBlockNum;
private LZ4SafeDecompressor deCompressor;
- private ByteBuffer dataBuffer;
private ByteBuffer decompressedBuffer;
-// private byte[] readBuffer;
private GeneralColumnDataReader blockDataReader;
public LZ4CompressedColumnReader(ByteBuffer dataBuffer, int
columnDataStartOffset, int columnDataLength,
int rowCount) {
this.rowCount = rowCount;
- this.dataBuffer = dataBuffer;
int footStartOffset = columnDataStartOffset + columnDataLength - 8;
dataBuffer.position(footStartOffset);
this.numValInBlock = dataBuffer.getInt();
@@ -53,7 +51,6 @@ public class LZ4CompressedColumnReader implements
ColumnDataReader {
this.blockDataReader = new GeneralColumnDataReader(dataBuffer,
columnDataStartOffset, columnDataLength - 8);
this.currBlockNum = -1;
-// this.readBuffer = new byte[valLen];
this.deCompressor = LZ4Factory.fastestInstance().safeDecompressor();
this.maxBufferLength = numValInBlock * valLen;
@@ -104,6 +101,9 @@ public class LZ4CompressedColumnReader implements
ColumnDataReader {
@Override
public byte[] next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
if (currBlockNum == -1 || !decompressedBuffer.hasRemaining()) {
loadNextBuffer();
}
diff --git
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/NoCompressedColumnReader.java
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/NoCompressedColumnReader.java
index 77abd76..9378f3e 100644
---
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/NoCompressedColumnReader.java
+++
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/NoCompressedColumnReader.java
@@ -21,6 +21,7 @@ package
org.apache.kylin.stream.core.storage.columnar.compress;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
+import java.util.NoSuchElementException;
import org.apache.kylin.stream.core.storage.columnar.ColumnDataReader;
@@ -67,6 +68,9 @@ public class NoCompressedColumnReader implements
ColumnDataReader {
@Override
public byte[] next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
byte[] readBuffer = new byte[colValLength];
dataBuffer.get(readBuffer);
readRowCount++;
diff --git
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/RunLengthCompressedColumnReader.java
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/RunLengthCompressedColumnReader.java
index 5346a72..131da93 100644
---
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/RunLengthCompressedColumnReader.java
+++
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/RunLengthCompressedColumnReader.java
@@ -21,6 +21,7 @@ package
org.apache.kylin.stream.core.storage.columnar.compress;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
+import java.util.NoSuchElementException;
import org.apache.kylin.stream.core.storage.columnar.ColumnDataReader;
import org.apache.kylin.stream.core.storage.columnar.GeneralColumnDataReader;
@@ -123,6 +124,10 @@ public class RunLengthCompressedColumnReader implements
ColumnDataReader {
@Override
public byte[] next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
if (readRLEntryValCnt >= currRLEntryValCnt) {
loadNextEntry();
}
diff --git
a/stream-core/src/main/java/org/apache/kylin/stream/core/util/RecordsSerializer.java
b/stream-core/src/main/java/org/apache/kylin/stream/core/util/RecordsSerializer.java
index 640862b..4d5d819 100644
---
a/stream-core/src/main/java/org/apache/kylin/stream/core/util/RecordsSerializer.java
+++
b/stream-core/src/main/java/org/apache/kylin/stream/core/util/RecordsSerializer.java
@@ -113,6 +113,7 @@ public class RecordsSerializer {
@Override
public Record next() {
+
deserializeRecord(oneRecord, inputBuffer);
return oneRecord;
}
diff --git
a/stream-core/src/test/java/org/apache/kylin/stream/core/storage/columnar/compress/LZ4CompressColumnTest.java
b/stream-core/src/test/java/org/apache/kylin/stream/core/storage/columnar/compress/LZ4CompressColumnTest.java
index 04a9193..40b6443 100644
---
a/stream-core/src/test/java/org/apache/kylin/stream/core/storage/columnar/compress/LZ4CompressColumnTest.java
+++
b/stream-core/src/test/java/org/apache/kylin/stream/core/storage/columnar/compress/LZ4CompressColumnTest.java
@@ -28,7 +28,6 @@ import java.nio.channels.FileChannel.MapMode;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.util.Bytes;
@@ -77,8 +76,7 @@ public class LZ4CompressColumnTest {
int compressedSize = writeCompressedData(rowCnt);
System.out.println("compressed data size:" + compressedSize);
FileSystem fs = FileSystem.getLocal(new Configuration());
- FSDataInputStream fsInputStream = fs.open(new
Path(tmpColFile.getAbsolutePath()));
- try (FSInputLZ4CompressedColumnReader reader = new
FSInputLZ4CompressedColumnReader(fsInputStream, 0,
+ try (FSInputLZ4CompressedColumnReader reader = new
FSInputLZ4CompressedColumnReader(fs, new Path(tmpColFile.getAbsolutePath()), 0,
compressedSize, rowCnt)) {
int k = 0;
for (byte[] val : reader) {
diff --git
a/stream-core/src/test/java/org/apache/kylin/stream/core/storage/columnar/compress/NoCompressColumnTest.java
b/stream-core/src/test/java/org/apache/kylin/stream/core/storage/columnar/compress/NoCompressColumnTest.java
index 0b1b92c..43d97ea 100644
---
a/stream-core/src/test/java/org/apache/kylin/stream/core/storage/columnar/compress/NoCompressColumnTest.java
+++
b/stream-core/src/test/java/org/apache/kylin/stream/core/storage/columnar/compress/NoCompressColumnTest.java
@@ -28,7 +28,6 @@ import java.nio.channels.FileChannel.MapMode;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.util.Bytes;
@@ -74,8 +73,8 @@ public class NoCompressColumnTest {
writeNoCompressedData(rowCnt);
FileSystem fs = FileSystem.getLocal(new Configuration());
- FSDataInputStream fsInputStream = fs.open(new
Path(tmpColFile.getAbsolutePath()));
- try (FSInputNoCompressedColumnReader reader = new
FSInputNoCompressedColumnReader(fsInputStream, 0, 4, rowCnt)) {
+ try (FSInputNoCompressedColumnReader reader = new
FSInputNoCompressedColumnReader(fs,
+ new Path(tmpColFile.getAbsolutePath()), 0, 4, rowCnt)) {
int k = 0;
for (byte[] val : reader) {
assertEquals(k, Bytes.toInt(val));
diff --git
a/stream-core/src/test/java/org/apache/kylin/stream/core/storage/columnar/compress/RunLengthCompressColumnTest.java
b/stream-core/src/test/java/org/apache/kylin/stream/core/storage/columnar/compress/RunLengthCompressColumnTest.java
index 3015aa7..bc31db6 100644
---
a/stream-core/src/test/java/org/apache/kylin/stream/core/storage/columnar/compress/RunLengthCompressColumnTest.java
+++
b/stream-core/src/test/java/org/apache/kylin/stream/core/storage/columnar/compress/RunLengthCompressColumnTest.java
@@ -28,7 +28,6 @@ import java.nio.channels.FileChannel.MapMode;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.util.Bytes;
@@ -89,8 +88,7 @@ public class RunLengthCompressColumnTest {
int size = writeCompressData1(rowCnt);
FileSystem fs = FileSystem.getLocal(new Configuration());
- FSDataInputStream fsInputStream = fs.open(new
Path(tmpColFile.getAbsolutePath()));
- try (FSInputRLECompressedColumnReader reader = new
FSInputRLECompressedColumnReader(fsInputStream, 0, size,
+ try (FSInputRLECompressedColumnReader reader = new
FSInputRLECompressedColumnReader(fs, new Path(tmpColFile.getAbsolutePath()), 0,
size,
rowCnt)) {
int k = 0;
for (byte[] val : reader) {
diff --git
a/tool/src/main/java/org/apache/kylin/tool/query/ProbabilityGenerator.java
b/tool/src/main/java/org/apache/kylin/tool/query/ProbabilityGenerator.java
index 9f32de1..800d4c9 100644
--- a/tool/src/main/java/org/apache/kylin/tool/query/ProbabilityGenerator.java
+++ b/tool/src/main/java/org/apache/kylin/tool/query/ProbabilityGenerator.java
@@ -70,6 +70,10 @@ public class ProbabilityGenerator {
double[] pQueryArray = new double[nOfEle];
int sumHit = generateHitNumberList(nHitArray);
+
+ if (sumHit == 0)
+ throw new IllegalStateException();
+
for (int i = 0; i < nOfEle; i++) {
pQueryArray[i] = nHitArray[i] * 1.0 / sumHit;
}