This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new 0b936eb minor, fix sonar reported bugs (#1368) 0b936eb is described below commit 0b936eb16c7fb29d337f0e59759231030256ac28 Author: Shaofeng Shi <shaofeng...@apache.org> AuthorDate: Sun Aug 23 16:32:45 2020 +0800 minor, fix sonar reported bugs (#1368) * minor, fix sonar reported bugs * minor, fix sonar * minor, code format --- .../java/org/apache/kylin/common/util/Bytes.java | 2 +- .../cube/inmemcubing/AbstractInMemCubeBuilder.java | 8 +-- .../cube/inmemcubing/ICuboidGTTableWriter.java | 8 +-- .../kylin/cube/inmemcubing/InMemCubeBuilder.java | 11 +-- .../gridtable/GTAggregateTransformScanner.java | 37 ++++++++-- .../gridtable/GTTwoLayerAggregateScanner.java | 2 + .../kylin/cube/inmemcubing/MemDiskStoreTest.java | 11 +-- .../kylin/gridtable/SimpleGridTableTest.java | 84 +++++++++++----------- .../apache/kylin/dict/GlobalDictionaryBuilder.java | 11 +-- .../kylin/metadata/streaming/StreamingConfig.java | 2 +- .../kylin/storage/gtrecord/DictGridTableTest.java | 26 +++---- .../engine/mr/common/MapReduceExecutable.java | 14 ++-- .../mr/streaming/ColumnarSplitDataReader.java | 2 +- .../kylin/engine/mr/streaming/DictsReader.java | 44 +++++------- .../kylin/engine/mr/streaming/RowRecordReader.java | 41 +++++------ .../kylin/realtime/BuildCubeWithStreamV2.java | 2 +- .../apache/kylin/query/relnode/OLAPContext.java | 12 ++-- .../coprocessor/endpoint/CubeVisitServiceTest.java | 25 +++---- .../columnar/ColumnarMemoryStorePersister.java | 12 ++-- .../storage/columnar/ColumnarStoreDimDesc.java | 13 ++-- .../storage/columnar/ColumnarStoreMetricsDesc.java | 13 ++-- .../columnar/FSInputGeneralColumnDataReader.java | 6 +- .../storage/columnar/FragmentCuboidReader.java | 12 ++-- .../core/storage/columnar/SegmentMemoryStore.java | 29 ++++---- .../compress/FSInputLZ4CompressedColumnReader.java | 13 ++-- .../compress/FSInputNoCompressedColumnReader.java | 18 ++--- .../compress/FSInputRLECompressedColumnReader.java | 12 +++- .../compress/LZ4CompressedColumnReader.java | 8 +-- .../compress/NoCompressedColumnReader.java | 4 ++ .../compress/RunLengthCompressedColumnReader.java | 5 ++ .../kylin/stream/core/util/RecordsSerializer.java | 1 + .../columnar/compress/LZ4CompressColumnTest.java | 4 +- .../columnar/compress/NoCompressColumnTest.java | 5 +- .../compress/RunLengthCompressColumnTest.java | 4 +- .../kylin/tool/query/ProbabilityGenerator.java | 4 ++ 35 files changed, 273 insertions(+), 232 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/util/Bytes.java b/core-common/src/main/java/org/apache/kylin/common/util/Bytes.java index 65f6fef..2f13fba 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/Bytes.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/Bytes.java @@ -71,7 +71,7 @@ public class Bytes { /** * Size of boolean in bytes */ - public static final int SIZEOF_BOOLEAN = Byte.SIZE / Byte.SIZE; + public static final int SIZEOF_BOOLEAN = 1; /** * Size of byte in bytes diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java index df1fa7a..89c2cd8 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java @@ -101,11 +101,11 @@ abstract public class AbstractInMemCubeBuilder { protected void outputCuboid(long cuboidId, GridTable gridTable, ICuboidWriter output) throws IOException { long startTime = System.currentTimeMillis(); GTScanRequest req = new GTScanRequestBuilder().setInfo(gridTable.getInfo()).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest(); - IGTScanner scanner = gridTable.scan(req); - for (GTRecord record : scanner) { - output.write(cuboidId, record); + try (IGTScanner scanner = gridTable.scan(req)) { + for (GTRecord record : scanner) { + output.write(cuboidId, record); + } } - scanner.close(); logger.debug("Cuboid " + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms"); } diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidGTTableWriter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidGTTableWriter.java index 93a7994..5c32d6c 100755 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidGTTableWriter.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidGTTableWriter.java @@ -37,11 +37,11 @@ public abstract class ICuboidGTTableWriter implements ICuboidWriter{ public void write(long cuboidId, GridTable gridTable) throws IOException { long startTime = System.currentTimeMillis(); GTScanRequest req = new GTScanRequestBuilder().setInfo(gridTable.getInfo()).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest(); - IGTScanner scanner = gridTable.scan(req); - for (GTRecord record : scanner) { - write(cuboidId, record); + try (IGTScanner scanner = gridTable.scan(req)) { + for (GTRecord record : scanner) { + write(cuboidId, record); + } } - scanner.close(); logger.info("Cuboid " + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms"); } } diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java index 403ec27..eabad14 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java @@ -445,15 +445,14 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { long startTime = System.currentTimeMillis(); logger.info("Calculating cuboid {}", cuboidId); - GTAggregateScanner scanner = prepareGTAggregationScanner(gridTable, parentId, cuboidId, aggregationColumns, measureColumns); - GridTable newGridTable = newGridTableByCuboidID(cuboidId); - GTBuilder builder = newGridTable.rebuild(); + GridTable newGridTable = newGridTableByCuboidID(cuboidId); ImmutableBitSet allNeededColumns = aggregationColumns.or(measureColumns); GTRecord newRecord = new GTRecord(newGridTable.getInfo()); int count = 0; - try { + try (GTAggregateScanner scanner = prepareGTAggregationScanner(gridTable, parentId, cuboidId, aggregationColumns, measureColumns); + GTBuilder builder = newGridTable.rebuild()) { for (GTRecord record : scanner) { count++; for (int i = 0; i < allNeededColumns.trueBitCount(); i++) { @@ -462,10 +461,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { } builder.write(newRecord); } - - } finally { - scanner.close(); - builder.close(); } long timeSpent = System.currentTimeMillis() - startTime; diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateTransformScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateTransformScanner.java index 0f805e8..c59ae36 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateTransformScanner.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateTransformScanner.java @@ -38,6 +38,7 @@ public class GTAggregateTransformScanner implements IGTScanner { private final GTScanRequest req; private final GTTwoLayerAggregateParam twoLayerAggParam; private long inputRowCount = 0L; + private Iterator<GTRecord> iterator = null; public GTAggregateTransformScanner(IGTScanner inputScanner, GTScanRequest req) { this.inputScanner = inputScanner; @@ -57,9 +58,13 @@ public class GTAggregateTransformScanner implements IGTScanner { @Override public Iterator<GTRecord> iterator() { - return twoLayerAggParam.satisfyPrefix(req.getDimensions()) - ? new FragmentTransformGTRecordIterator(inputScanner.iterator()) - : new NormalTransformGTRecordIterator(inputScanner.iterator()); + if (iterator == null) { + iterator = twoLayerAggParam.satisfyPrefix(req.getDimensions()) + ? new FragmentTransformGTRecordIterator(inputScanner.iterator()) + : new NormalTransformGTRecordIterator(inputScanner.iterator()); + } + + return iterator; } public long getInputRowCount() { @@ -75,6 +80,7 @@ public class GTAggregateTransformScanner implements IGTScanner { private class FragmentTransformGTRecordIterator extends TransformGTRecordIterator { private final PrefixFragmentIterator fragmentIterator; + private GTAggregateScanner aggregateScanner; private Iterator<GTRecord> transformedFragment = null; FragmentTransformGTRecordIterator(Iterator<GTRecord> input) { @@ -88,6 +94,12 @@ public class GTAggregateTransformScanner implements IGTScanner { } if (!fragmentIterator.hasNext()) { + // close resource + try { + aggregateScanner.close(); + } catch (IOException e) { + // do nothing + } return false; } @@ -106,7 +118,8 @@ public class GTAggregateTransformScanner implements IGTScanner { return fragmentIterator.next(); } }; - transformedFragment = new GTAggregateScanner(fragmentScanner, innerReq).iterator(); + aggregateScanner = new GTAggregateScanner(fragmentScanner, innerReq); + transformedFragment = aggregateScanner.iterator(); return hasNext(); } @@ -124,6 +137,7 @@ public class GTAggregateTransformScanner implements IGTScanner { private class NormalTransformGTRecordIterator extends TransformGTRecordIterator { private final Iterator<GTRecord> aggRecordIterator; + private final GTAggregateScanner aggregateScanner; NormalTransformGTRecordIterator(final Iterator<GTRecord> input) { IGTScanner gtScanner = new IGTScanner() { @@ -134,6 +148,7 @@ public class GTAggregateTransformScanner implements IGTScanner { @Override public void close() throws IOException { + // do nothing } @Override @@ -141,13 +156,21 @@ public class GTAggregateTransformScanner implements IGTScanner { return input; } }; - - aggRecordIterator = new GTAggregateScanner(gtScanner, innerReq).iterator(); + aggregateScanner = new GTAggregateScanner(gtScanner, innerReq); + aggRecordIterator = aggregateScanner.iterator(); } @Override public boolean hasNext() { - return aggRecordIterator.hasNext(); + boolean hasNext = aggRecordIterator.hasNext(); + if (!hasNext) { + try { + aggregateScanner.close(); + } catch (IOException e) { + // do nothing + } + } + return hasNext; } @Override diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTTwoLayerAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTTwoLayerAggregateScanner.java index 5df96e2..58b5c1e 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTTwoLayerAggregateScanner.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTTwoLayerAggregateScanner.java @@ -46,6 +46,8 @@ public class GTTwoLayerAggregateScanner implements IGTScanner { @Override public void close() throws IOException { inputScanner.close(); + secondLayerInputScanner.close(); + outputScanner.close(); } @Override diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java index f457ea6..bd147bf 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java @@ -102,11 +102,12 @@ public class MemDiskStoreTest extends LocalFileMetadataTestCase { } builder.close(); - IGTScanner scanner = table.scan(new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest()); - int i = 0; - for (GTRecord r : scanner) { - assertEquals(data.get(i++), r); + try (IGTScanner scanner = table.scan(new GTScanRequestBuilder().setInfo(info).setRanges(null) + .setDimensions(null).setFilterPushDown(null).createGTScanRequest())) { + int i = 0; + for (GTRecord r : scanner) { + assertEquals(data.get(i++), r); + } } - scanner.close(); } } diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java index 14a25c5..6f3de1c 100644 --- a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java +++ b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java @@ -51,7 +51,7 @@ public class SimpleGridTableTest extends LocalFileMetadataTestCase { GridTable table = new GridTable(info, store); GTBuilder builder = rebuild(table); - IGTScanner scanner = scan(table); + scan(table); } @Test @@ -61,7 +61,7 @@ public class SimpleGridTableTest extends LocalFileMetadataTestCase { GridTable table = new GridTable(info, store); GTBuilder builder = rebuild(table); - IGTScanner scanner = scan(table); + scan(table); } @Test @@ -71,7 +71,7 @@ public class SimpleGridTableTest extends LocalFileMetadataTestCase { GridTable table = new GridTable(info, store); GTBuilder builder = rebuild(table); - IGTScanner scanner = scanAndAggregate(table); + scanAndAggregate(table); } @Test @@ -81,55 +81,53 @@ public class SimpleGridTableTest extends LocalFileMetadataTestCase { GridTable table = new GridTable(info, store); rebuildViaAppend(table); - IGTScanner scanner = scan(table); + scan(table); } - private IGTScanner scan(GridTable table) throws IOException { + private void scan(GridTable table) throws IOException { GTScanRequest req = new GTScanRequestBuilder().setInfo(table.getInfo()).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest(); - IGTScanner scanner = table.scan(req); - for (GTRecord r : scanner) { - Object[] v = r.getValues(); - assertTrue(((String) v[0]).startsWith("2015-")); - assertTrue(((String) v[2]).equals("Food")); - assertTrue(((Long) v[3]).longValue() == 10); - assertTrue(((BigDecimal) v[4]).doubleValue() == 10.5); - System.out.println(r); + try (IGTScanner scanner = table.scan(req)) { + for (GTRecord r : scanner) { + Object[] v = r.getValues(); + assertTrue(((String) v[0]).startsWith("2015-")); + assertTrue(((String) v[2]).equals("Food")); + assertTrue(((Long) v[3]).longValue() == 10); + assertTrue(((BigDecimal) v[4]).doubleValue() == 10.5); + System.out.println(r); + } } - scanner.close(); - return scanner; } - private IGTScanner scanAndAggregate(GridTable table) throws IOException { + private void scanAndAggregate(GridTable table) throws IOException { GTScanRequest req = new GTScanRequestBuilder().setInfo(table.getInfo()).setRanges(null).setDimensions(null).setAggrGroupBy(setOf(0, 2)).setAggrMetrics(setOf(3, 4)).setAggrMetricsFuncs(new String[] { "count", "sum" }).setFilterPushDown(null).createGTScanRequest(); - IGTScanner scanner = table.scan(req); - int i = 0; - for (GTRecord r : scanner) { - Object[] v = r.getValues(); - switch (i) { - case 0: - assertTrue(((Long) v[3]).longValue() == 20); - assertTrue(((BigDecimal) v[4]).doubleValue() == 21.0); - break; - case 1: - assertTrue(((Long) v[3]).longValue() == 30); - assertTrue(((BigDecimal) v[4]).doubleValue() == 31.5); - break; - case 2: - assertTrue(((Long) v[3]).longValue() == 40); - assertTrue(((BigDecimal) v[4]).doubleValue() == 42.0); - break; - case 3: - assertTrue(((Long) v[3]).longValue() == 10); - assertTrue(((BigDecimal) v[4]).doubleValue() == 10.5); - break; - default: - fail(); + try (IGTScanner scanner = table.scan(req)) { + int i = 0; + for (GTRecord r : scanner) { + Object[] v = r.getValues(); + switch (i) { + case 0: + assertTrue(((Long) v[3]).longValue() == 20); + assertTrue(((BigDecimal) v[4]).doubleValue() == 21.0); + break; + case 1: + assertTrue(((Long) v[3]).longValue() == 30); + assertTrue(((BigDecimal) v[4]).doubleValue() == 31.5); + break; + case 2: + assertTrue(((Long) v[3]).longValue() == 40); + assertTrue(((BigDecimal) v[4]).doubleValue() == 42.0); + break; + case 3: + assertTrue(((Long) v[3]).longValue() == 10); + assertTrue(((BigDecimal) v[4]).doubleValue() == 10.5); + break; + default: + fail(); + } + i++; + System.out.println(r); } - i++; - System.out.println(r); } - scanner.close(); - return scanner; } static GTBuilder rebuild(GridTable table) throws IOException { diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java index d813793..ba31e2b 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java @@ -46,9 +46,6 @@ public class GlobalDictionaryBuilder implements IDictionaryBuilder { @Override public void init(DictionaryInfo dictInfo, int baseId, String hdfsDir) throws IOException { sourceColumn = dictInfo.getSourceTable() + "_" + dictInfo.getSourceColumn(); - lock = KylinConfig.getInstanceFromEnv().getDistributedLockFactory().lockForCurrentThread(); - lock.lock(getLockPath(sourceColumn), Long.MAX_VALUE); - int maxEntriesPerSlice = KylinConfig.getInstanceFromEnv().getAppendDictEntrySize(); if (hdfsDir == null) { //build in Kylin job server @@ -56,12 +53,18 @@ public class GlobalDictionaryBuilder implements IDictionaryBuilder { } String baseDir = hdfsDir + "resources/GlobalDict" + dictInfo.getResourceDir() + "/"; + lock = KylinConfig.getInstanceFromEnv().getDistributedLockFactory().lockForCurrentThread(); + String lockPath = getLockPath(sourceColumn); try { + lock.lock(lockPath, Long.MAX_VALUE); this.builder = new AppendTrieDictionaryBuilder(baseDir, maxEntriesPerSlice, true); } catch (Throwable e) { - lock.unlock(getLockPath(sourceColumn)); throw new RuntimeException( String.format(Locale.ROOT, "Failed to create global dictionary on %s ", sourceColumn), e); + } finally { + if (lock.isLockedByMe(lockPath)) { + lock.unlock(lockPath); + } } this.baseId = baseId; } diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java index 335d3c8..3de65ea 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java @@ -38,7 +38,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) public class StreamingConfig extends RootPersistentEntity { - public static Serializer<StreamingConfig> SERIALIZER = new JsonSerializer<StreamingConfig>(StreamingConfig.class); + private static final Serializer<StreamingConfig> SERIALIZER = new JsonSerializer<>(StreamingConfig.class); public static final String STREAMING_TYPE_KAFKA = "kafka"; diff --git a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java index c49edc7..574bb9f 100644 --- a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java +++ b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java @@ -364,12 +364,12 @@ public class DictGridTableTest extends LocalFileMetadataTestCase { long start = System.currentTimeMillis(); GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null) .setFilterPushDown(filter).createGTScanRequest(); - IGTScanner scanner = table.scan(req); int i = 0; - for (GTRecord r : scanner) { - i++; + try (IGTScanner scanner = table.scan(req)) { + for (GTRecord r : scanner) { + i++; + } } - scanner.close(); long end = System.currentTimeMillis(); System.out.println( (end - start) + "ms with filter cache enabled=" + FilterResultCache.DEFAULT_OPTION + ", " + i + " rows"); @@ -555,17 +555,17 @@ public class DictGridTableTest extends LocalFileMetadataTestCase { private void doScanAndVerify(GridTable table, GTScanRequest req, String... verifyRows) throws IOException { System.out.println(req); - IGTScanner scanner = table.scan(req); - int i = 0; - for (GTRecord r : scanner) { - System.out.println(r); - if (verifyRows == null || i >= verifyRows.length) { - Assert.fail(); + try (IGTScanner scanner = table.scan(req)) { + int i = 0; + for (GTRecord r : scanner) { + System.out.println(r); + if (verifyRows == null || i >= verifyRows.length) { + Assert.fail(); + } + assertEquals(verifyRows[i], r.toString()); + i++; } - assertEquals(verifyRows[i], r.toString()); - i++; } - scanner.close(); } public static ByteArray enc(GTInfo info, int col, String value) { diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java index 4dd68a5..62fb814 100755 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java @@ -371,17 +371,17 @@ public class MapReduceExecutable extends AbstractExecutable { long lockStartTime = System.currentTimeMillis(); boolean isLockedByTheJob = lock.isLocked(fullLockPath); - logger.info("cube job {} zk lock is isLockedByTheJob:{}", getId(), isLockedByTheJob); - if (!isLockedByTheJob) {//not lock by the job + logger.info("cube job {} zk lock is isLockedByTheJob: {}", getId(), isLockedByTheJob); + if (!isLockedByTheJob) { //not lock by the job while (isLockedByOther) { - isLockedByOther = lock.isLocked(getCubeJobLockParentPathName());//other job global lock + isLockedByOther = lock.isLocked(getCubeJobLockParentPathName()); //other job global lock - if (!isLockedByOther) {//not lock by other job + if (!isLockedByOther) { //not lock by other job isLockedByOther = lock.isLocked(ephemeralLockPath);//check the ephemeral current lock logger.info("zookeeper lock path :{}, is locked by other job result is {}", ephemeralLockPath, isLockedByOther); - if (!isLockedByOther) {//the ephemeral lock not lock by other job + if (!isLockedByOther) { //the ephemeral lock not lock by other job //try to get ephemeral lock try { logger.debug("{} before start to get lock ephemeralLockPath {}", getId(), @@ -413,12 +413,12 @@ public class MapReduceExecutable extends AbstractExecutable { } } } - isLockedByOther = true;//get lock fail,will try again + isLockedByOther = true;//get lock fail, will try again } } // wait 1 min and try again logger.info( - "{}, parent lock path({}) is locked by other job result is {} ,ephemeral lock path :{} is locked by other job result is {},will try after one minute", + "{}, parent lock path({}) is locked by other job result is {} ,ephemeral lock path: {} is locked by other job result is {}, will try after one minute", getId(), getCubeJobLockParentPathName(), isLockedByOther, ephemeralLockPath, isLockedByOther); Thread.sleep(60000); } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/ColumnarSplitDataReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/ColumnarSplitDataReader.java index d426cfc..9d33119 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/ColumnarSplitDataReader.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/ColumnarSplitDataReader.java @@ -52,7 +52,7 @@ public class ColumnarSplitDataReader extends ColumnarSplitReader { init(inputSplit, context); } - public void init(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { + public void init(InputSplit split, TaskAttemptContext context) throws IOException { baseCuboid = Cuboid.getBaseCuboid(cubeDesc); rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/DictsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/DictsReader.java index b543c8b..7aa9d19 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/DictsReader.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/DictsReader.java @@ -36,43 +36,33 @@ import org.apache.kylin.shaded.com.google.common.collect.ImmutableMap; public class DictsReader extends ColumnarFilesReader { private static final Logger logger = LoggerFactory.getLogger(DictsReader.class); - private FSDataInputStream metaInputStream; - private FSDataInputStream dataInputStream; public DictsReader(Path path, FileSystem fileSystem) throws IOException { super(fileSystem, path); } public ImmutableMap<String, Dictionary> readDicts() throws IOException { - metaInputStream = fs.open(metaFilePath); - FragmentMetaInfo fragmentMetaInfo = JsonUtil.readValue(metaInputStream, FragmentMetaInfo.class); - List<DimDictionaryMetaInfo> dimDictMetaInfos = fragmentMetaInfo.getDimDictionaryMetaInfos(); - ImmutableMap.Builder<String, Dictionary> builder = ImmutableMap.<String, Dictionary> builder(); - dataInputStream = fs.open(dataFilePath); - Dictionary dict; - String colName; - logger.info("Reading dictionary from {}", dataFilePath.getName()); - for (DimDictionaryMetaInfo dimDictMetaInfo : dimDictMetaInfos) { - dataInputStream.seek(dimDictMetaInfo.getStartOffset()); - dict = DictionarySerializer.deserialize(dataInputStream); - colName = dimDictMetaInfo.getDimName(); - logger.info("Add dict for {}", colName); - builder.put(colName, dict); + try (FSDataInputStream metaInputStream = fs.open(metaFilePath); + FSDataInputStream dataInputStream = fs.open(dataFilePath)) { + FragmentMetaInfo fragmentMetaInfo = JsonUtil.readValue(metaInputStream, FragmentMetaInfo.class); + List<DimDictionaryMetaInfo> dimDictMetaInfos = fragmentMetaInfo.getDimDictionaryMetaInfos(); + ImmutableMap.Builder<String, Dictionary> builder = ImmutableMap.builder(); + Dictionary dict; + String colName; + logger.info("Reading dictionary from {}", dataFilePath.getName()); + for (DimDictionaryMetaInfo dimDictMetaInfo : dimDictMetaInfos) { + dataInputStream.seek(dimDictMetaInfo.getStartOffset()); + dict = DictionarySerializer.deserialize(dataInputStream); + colName = dimDictMetaInfo.getDimName(); + logger.info("Add dict for {}", colName); + builder.put(colName, dict); + } + return builder.build(); } - return builder.build(); } @Override public void close() throws IOException{ - try { - if (metaInputStream != null) { - metaInputStream.close(); - } - if (dataInputStream != null) { - dataInputStream.close(); - } - } catch (IOException e) { - logger.error("close file error", e); - } + } } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/RowRecordReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/RowRecordReader.java index 2206fd7..1e610bd 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/RowRecordReader.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/RowRecordReader.java @@ -82,24 +82,25 @@ public class RowRecordReader extends ColumnarFilesReader { } public void initReaders() throws IOException { - FSDataInputStream in = fs.open(metaFilePath); - FragmentMetaInfo fragmentMetaInfo = JsonUtil.readValue(in, FragmentMetaInfo.class); - CuboidMetaInfo basicCuboidMetaInfo = fragmentMetaInfo.getBasicCuboidMetaInfo(); - FSDataInputStream dictInputStream = fs.open(dataFilePath); + List<DimensionMetaInfo> allDimensions; + CuboidMetaInfo basicCuboidMetaInfo; + Map<String, DimensionEncoding> dimensionEncodingMap; + try (FSDataInputStream in = fs.open(metaFilePath)) { + FragmentMetaInfo fragmentMetaInfo = JsonUtil.readValue(in, FragmentMetaInfo.class); + basicCuboidMetaInfo = fragmentMetaInfo.getBasicCuboidMetaInfo(); + allDimensions = basicCuboidMetaInfo.getDimensionsInfo(); + dimensionEncodingMap = getDimensionEncodings(fragmentMetaInfo, allDimensions, dataFilePath); + } - List<DimensionMetaInfo> allDimensions = basicCuboidMetaInfo.getDimensionsInfo(); - Map<String, DimensionEncoding> dimensionEncodingMap = getDimensionEncodings(fragmentMetaInfo, allDimensions, - dictInputStream); dimensionColumnReaders = Lists.newArrayList(); dimensionColumnReaderItrs = Lists.newArrayList(); dimensionEncodings = Lists.newArrayList(); for (DimensionMetaInfo dimensionMetaInfo : allDimensions) { - FSDataInputStream dimInputStream = fs.open(dataFilePath); String dimName = dimensionMetaInfo.getName(); DimensionEncoding dimEncoding = dimensionEncodingMap.get(dimName); ColumnarStoreDimDesc dimDesc = new ColumnarStoreDimDesc(dimEncoding.getLengthOfEncoding(), dimensionMetaInfo.getCompressionType()); - ColumnDataReader dimDataReader = dimDesc.getDimReaderFromFSInput(dimInputStream, + ColumnDataReader dimDataReader = dimDesc.getDimReaderFromFSInput(fs, dataFilePath, dimensionMetaInfo.getStartOffset(), dimensionMetaInfo.getDataLength(), (int) basicCuboidMetaInfo.getNumberOfRows()); dimensionColumnReaders.add(dimDataReader); @@ -112,13 +113,13 @@ public class RowRecordReader extends ColumnarFilesReader { metricsColumnReaderItrs = Lists.newArrayList(); metricsDataTransformers = Lists.newArrayList(); for (MetricMetaInfo metricMetaInfo : basicCuboidMetaInfo.getMetricsInfo()) { - FSDataInputStream metricsInputStream = fs.open(dataFilePath); + MeasureDesc measure = findMeasure(metricMetaInfo.getName()); DataType metricsDataType = measure.getFunction().getReturnDataType(); ColumnarMetricsEncoding metricsEncoding = ColumnarMetricsEncodingFactory.create(metricsDataType); ColumnarStoreMetricsDesc metricsDesc = new ColumnarStoreMetricsDesc(metricsEncoding, metricMetaInfo.getCompressionType()); - ColumnDataReader metricsDataReader = metricsDesc.getMetricsReaderFromFSInput(metricsInputStream, + ColumnDataReader metricsDataReader = metricsDesc.getMetricsReaderFromFSInput(fs, dataFilePath, metricMetaInfo.getStartOffset(), metricMetaInfo.getMetricLength(), (int) basicCuboidMetaInfo.getNumberOfRows()); metricsColumnReaders.add(metricsDataReader); @@ -140,8 +141,11 @@ public class RowRecordReader extends ColumnarFilesReader { } private Map<String, DimensionEncoding> getDimensionEncodings(FragmentMetaInfo fragmentMetaInfo, - List<DimensionMetaInfo> allDimensions, FSDataInputStream dictInputStream) throws IOException { - Map<String, Dictionary> dictionaryMap = readAllDimensionsDictionary(fragmentMetaInfo, dictInputStream); + List<DimensionMetaInfo> allDimensions, Path dataPath) throws IOException { + Map<String, Dictionary> dictionaryMap; + try (FSDataInputStream dictInputStream = fs.open(dataPath)) { + dictionaryMap = readAllDimensionsDictionary(fragmentMetaInfo, dictInputStream); + } Map<String, DimensionEncoding> result = Maps.newHashMap(); for (DimensionMetaInfo dimension : allDimensions) { @@ -152,7 +156,7 @@ public class RowRecordReader extends ColumnarFilesReader { Dictionary<String> dict = dictionaryMap.get(dimension.getName()); if (dict == null) { logger.error("No dictionary found for dict-encoding column " + col); - throw new RuntimeException("No dictionary found for dict-encoding column " + col); + throw new IllegalStateException("No dictionary found for dict-encoding column " + col); } else { result.put(dimension.getName(), new DictionaryDimEnc(dict)); } @@ -169,14 +173,11 @@ public class RowRecordReader extends ColumnarFilesReader { public Map<String, Dictionary> readAllDimensionsDictionary(FragmentMetaInfo fragmentMetaInfo, FSDataInputStream dataInputStream) throws IOException { List<DimDictionaryMetaInfo> dimDictMetaInfos = fragmentMetaInfo.getDimDictionaryMetaInfos(); - ImmutableMap.Builder<String, Dictionary> builder = ImmutableMap.<String, Dictionary> builder(); - Dictionary dict; - String colName; + ImmutableMap.Builder<String, Dictionary> builder = ImmutableMap.builder(); for (DimDictionaryMetaInfo dimDictMetaInfo : dimDictMetaInfos) { dataInputStream.seek(dimDictMetaInfo.getStartOffset()); - dict = DictionarySerializer.deserialize(dataInputStream); - colName = dimDictMetaInfo.getDimName(); - builder.put(colName, dict); + Dictionary dict = DictionarySerializer.deserialize(dataInputStream); + builder.put(dimDictMetaInfo.getDimName(), dict); } return builder.build(); } diff --git a/kylin-it/src/test/java/org/apache/kylin/realtime/BuildCubeWithStreamV2.java b/kylin-it/src/test/java/org/apache/kylin/realtime/BuildCubeWithStreamV2.java index 781c62a..fb85b26 100644 --- a/kylin-it/src/test/java/org/apache/kylin/realtime/BuildCubeWithStreamV2.java +++ b/kylin-it/src/test/java/org/apache/kylin/realtime/BuildCubeWithStreamV2.java @@ -249,7 +249,7 @@ public class BuildCubeWithStreamV2 extends KylinTestBase { } if (consumeDataDone == false) { - throw new IllegalStateException("Exec timeout, data not be comsumed completely"); // ensure all messages have been comsumed. + throw new IllegalStateException("Exec timeout, data not be consumed completely"); // ensure all messages have been comsumed. } waittime = 0; diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java index 423a4f0..2f3ad4c 100755 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java @@ -67,9 +67,9 @@ public class OLAPContext { public static final String PRM_ACCEPT_PARTIAL_RESULT = "AcceptPartialResult"; public static final String PRM_USER_AUTHEN_INFO = "UserAuthenInfo"; - static final InternalThreadLocal<Map<String, String>> _localPrarameters = new InternalThreadLocal<Map<String, String>>(); + static final InternalThreadLocal<Map<String, String>> _localPrarameters = new InternalThreadLocal<>(); - static final InternalThreadLocal<Map<Integer, OLAPContext>> _localContexts = new InternalThreadLocal<Map<Integer, OLAPContext>>(); + static final InternalThreadLocal<Map<Integer, OLAPContext>> _localContexts = new InternalThreadLocal<>(); public static void setParameters(Map<String, String> parameters) { _localPrarameters.set(parameters); @@ -81,7 +81,7 @@ public class OLAPContext { public static void registerContext(OLAPContext ctx) { if (_localContexts.get() == null) { - Map<Integer, OLAPContext> contextMap = new HashMap<Integer, OLAPContext>(); + Map<Integer, OLAPContext> contextMap = new HashMap<>(); _localContexts.set(contextMap); } _localContexts.get().put(ctx.id, ctx); @@ -147,7 +147,7 @@ public class OLAPContext { public Set<TblColRef> allColumns = new HashSet<>(); public List<TblColRef> groupByColumns = new ArrayList<>(); - public Set<TblColRef> subqueryJoinParticipants = new HashSet<TblColRef>();//subqueryJoinParticipants will be added to groupByColumns(only when other group by co-exists) and allColumns + public Set<TblColRef> subqueryJoinParticipants = new HashSet<>();//subqueryJoinParticipants will be added to groupByColumns(only when other group by co-exists) and allColumns public Set<TblColRef> metricsColumns = new HashSet<>(); public List<FunctionDesc> aggregations = new ArrayList<>(); // storage level measure type, on top of which various sql aggr function may apply public List<TblColRef> aggrOutCols = new ArrayList<>(); // aggregation output (inner) columns @@ -175,7 +175,7 @@ public class OLAPContext { public OLAPAuthentication olapAuthen = new OLAPAuthentication(); public boolean isSimpleQuery() { - return (joins.size() == 0) && (groupByColumns.size() == 0) && (aggregations.size() == 0); + return (joins.isEmpty()) && (groupByColumns.isEmpty()) && (aggregations.isEmpty()); } SQLDigest sqlDigest; @@ -343,7 +343,7 @@ public class OLAPContext { // ============================================================================ public interface IAccessController { - public void check(List<OLAPContext> contexts, KylinConfig config) throws IllegalStateException; + void check(List<OLAPContext> contexts, KylinConfig config); } } diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitServiceTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitServiceTest.java index 6650e6d..dfae455 100644 --- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitServiceTest.java +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitServiceTest.java @@ -137,18 +137,19 @@ public class CubeVisitServiceTest extends LocalFileMetadataTestCase { gtInfo = newInfo(); GridTable gridTable = newTable(gtInfo); - IGTScanner scanner = gridTable.scan(new GTScanRequestBuilder().setInfo(gtInfo).setRanges(null) - .setDimensions(null).setFilterPushDown(null).createGTScanRequest()); - for (GTRecord record : scanner) { - byte[] value = record.exportColumns(gtInfo.getPrimaryKey()).toBytes(); - byte[] key = new byte[RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN + value.length]; - System.arraycopy(Bytes.toBytes(baseCuboid), 0, key, RowConstants.ROWKEY_SHARDID_LEN, - RowConstants.ROWKEY_CUBOIDID_LEN); - System.arraycopy(value, 0, key, RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN, value.length); - Put put = new Put(key); - put.addColumn(FAM[0], COL_M, record.exportColumns(gtInfo.getColumnBlock(1)).toBytes()); - put.addColumn(FAM[1], COL_M, record.exportColumns(gtInfo.getColumnBlock(2)).toBytes()); - region.put(put); + try (IGTScanner scanner = gridTable.scan(new GTScanRequestBuilder().setInfo(gtInfo).setRanges(null) + .setDimensions(null).setFilterPushDown(null).createGTScanRequest())) { + for (GTRecord record : scanner) { + byte[] value = record.exportColumns(gtInfo.getPrimaryKey()).toBytes(); + byte[] key = new byte[RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN + value.length]; + System.arraycopy(Bytes.toBytes(baseCuboid), 0, key, RowConstants.ROWKEY_SHARDID_LEN, + RowConstants.ROWKEY_CUBOIDID_LEN); + System.arraycopy(value, 0, key, RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN, value.length); + Put put = new Put(key); + put.addColumn(FAM[0], COL_M, record.exportColumns(gtInfo.getColumnBlock(1)).toBytes()); + put.addColumn(FAM[1], COL_M, record.exportColumns(gtInfo.getColumnBlock(2)).toBytes()); + region.put(put); + } } } diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarMemoryStorePersister.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarMemoryStorePersister.java index 8114360..02a6ad6 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarMemoryStorePersister.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarMemoryStorePersister.java @@ -29,7 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ConcurrentMap; import javax.annotation.Nullable; @@ -125,7 +125,7 @@ public class ColumnarMemoryStorePersister { FileOutputStream fragmentOutputStream = FileUtils.openOutputStream(fragment.getDataFile()); try (CountingOutputStream fragmentOut = new CountingOutputStream(new BufferedOutputStream(fragmentOutputStream))) { - ConcurrentSkipListMap<String[], MeasureAggregator[]> basicCuboidData = memoryStore.getBasicCuboidData(); + ConcurrentMap<String[], MeasureAggregator[]> basicCuboidData = memoryStore.getBasicCuboidData(); List<List<Object>> basicCuboidColumnarValues = transformToColumnar(baseCuboidId, dimensions.length, basicCuboidData); // persist dictionaries @@ -139,13 +139,13 @@ public class ColumnarMemoryStorePersister { long totalRowCnt = basicCuboidMeta.getNumberOfRows(); // persist additional cuboids - Map<CuboidInfo, ConcurrentSkipListMap<String[], MeasureAggregator[]>> additionalCuboidsData = memoryStore + Map<CuboidInfo, ConcurrentMap<String[], MeasureAggregator[]>> additionalCuboidsData = memoryStore .getAdditionalCuboidsData(); if (additionalCuboidsData != null && additionalCuboidsData.size() > 0) { - for (Entry<CuboidInfo, ConcurrentSkipListMap<String[], MeasureAggregator[]>> cuboidDataEntry : additionalCuboidsData + for (Entry<CuboidInfo, ConcurrentMap<String[], MeasureAggregator[]>> cuboidDataEntry : additionalCuboidsData .entrySet()) { CuboidInfo cuboidInfo = cuboidDataEntry.getKey(); - ConcurrentSkipListMap<String[], MeasureAggregator[]> cuboidData = cuboidDataEntry.getValue(); + ConcurrentMap<String[], MeasureAggregator[]> cuboidData = cuboidDataEntry.getValue(); List<List<Object>> cuboidColumnarValues = transformToColumnar(cuboidInfo.getCuboidID(), cuboidInfo.getDimCount(), cuboidData); CuboidMetaInfo cuboidMeta = persistCuboidData(cuboidInfo.getCuboidID(), cuboidInfo.getDimensions(), @@ -171,7 +171,7 @@ public class ColumnarMemoryStorePersister { * @return */ private List<List<Object>> transformToColumnar(long cuboidId, int dimCnt, - ConcurrentSkipListMap<String[], MeasureAggregator[]> aggBufMap) { + ConcurrentMap<String[], MeasureAggregator[]> aggBufMap) { Stopwatch stopwatch = Stopwatch.createUnstarted(); stopwatch.start(); int columnsNum = dimCnt + measures.length; diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreDimDesc.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreDimDesc.java index 971d137..b762c7e 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreDimDesc.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreDimDesc.java @@ -22,7 +22,8 @@ import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; -import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.dimension.DimensionEncoding; import org.apache.kylin.stream.core.storage.columnar.compress.Compression; @@ -81,14 +82,14 @@ public class ColumnarStoreDimDesc { return new NoCompressedColumnReader(dataBuffer, columnDataStartOffset, columnDataLength / rowCount, rowCount); } - public ColumnDataReader getDimReaderFromFSInput(FSDataInputStream inputStream, int columnDataStartOffset, - int columnDataLength, int rowCount) throws IOException { + public ColumnDataReader getDimReaderFromFSInput(FileSystem fs, Path file, int columnDataStartOffset, + int columnDataLength, int rowCount) throws IOException { if (compression == Compression.LZ4) { - return new FSInputLZ4CompressedColumnReader(inputStream, columnDataStartOffset, columnDataLength, rowCount); + return new FSInputLZ4CompressedColumnReader(fs, file, columnDataStartOffset, columnDataLength, rowCount); } else if (compression == Compression.RUN_LENGTH) { - return new FSInputRLECompressedColumnReader(inputStream, columnDataStartOffset, columnDataLength, rowCount); + return new FSInputRLECompressedColumnReader(fs, file, columnDataStartOffset, columnDataLength, rowCount); } - return new FSInputNoCompressedColumnReader(inputStream, columnDataStartOffset, columnDataLength / rowCount, + return new FSInputNoCompressedColumnReader(fs, file, columnDataStartOffset, columnDataLength / rowCount, rowCount); } } diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreMetricsDesc.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreMetricsDesc.java index eca5ae7..a3f8de9 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreMetricsDesc.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreMetricsDesc.java @@ -23,7 +23,8 @@ import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; -import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.kylin.stream.core.storage.columnar.compress.Compression; import org.apache.kylin.stream.core.storage.columnar.compress.FSInputLZ4CompressedColumnReader; import org.apache.kylin.stream.core.storage.columnar.compress.FSInputNoCompressedColumnReader; @@ -74,16 +75,16 @@ public class ColumnarStoreMetricsDesc { return new GeneralColumnDataReader(dataBuffer, columnDataStartOffset, columnDataLength); } - public ColumnDataReader getMetricsReaderFromFSInput(FSDataInputStream inputStream, int columnDataStartOffset, - int columnDataLength, int rowCount) throws IOException { + public ColumnDataReader getMetricsReaderFromFSInput(FileSystem fs, Path filePath, int columnDataStartOffset, + int columnDataLength, int rowCount) throws IOException { if (Compression.LZ4 == compression && fixLen != -1) { - return new FSInputLZ4CompressedColumnReader(inputStream, columnDataStartOffset, columnDataLength, rowCount); + return new FSInputLZ4CompressedColumnReader(fs, filePath, columnDataStartOffset, columnDataLength, rowCount); } if (fixLen != -1) { - return new FSInputNoCompressedColumnReader(inputStream, columnDataStartOffset, columnDataLength / rowCount, + return new FSInputNoCompressedColumnReader(fs, filePath, columnDataStartOffset, columnDataLength / rowCount, rowCount); } - return new FSInputGeneralColumnDataReader(inputStream, columnDataStartOffset, columnDataLength); + return new FSInputGeneralColumnDataReader(fs, filePath, columnDataStartOffset, columnDataLength); } } diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FSInputGeneralColumnDataReader.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FSInputGeneralColumnDataReader.java index 5282533..363886b 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FSInputGeneralColumnDataReader.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FSInputGeneralColumnDataReader.java @@ -23,14 +23,16 @@ import java.util.Iterator; import java.util.NoSuchElementException; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; public class FSInputGeneralColumnDataReader implements ColumnDataReader { private FSDataInputStream fsInputStream; private int numOfVals; - public FSInputGeneralColumnDataReader(FSDataInputStream fsInputStream, int dataStartOffset, int dataLength) + public FSInputGeneralColumnDataReader(FileSystem fs, Path file, int dataStartOffset, int dataLength) throws IOException { - this.fsInputStream = fsInputStream; + this.fsInputStream = fs.open(file); fsInputStream.seek(dataStartOffset + dataLength - 4L); this.numOfVals = fsInputStream.readInt(); fsInputStream.seek(dataStartOffset); diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentCuboidReader.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentCuboidReader.java index 2b5621b..432cbbf 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentCuboidReader.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentCuboidReader.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.dimension.DimensionEncoding; @@ -39,14 +40,11 @@ public class FragmentCuboidReader implements Iterable<RawRecord> { private long readRowCount = 0; private int dimCnt; private int metricCnt; - - private CubeDesc cubeDesc; private ColumnDataReader[] dimensionDataReaders; private ColumnDataReader[] metricDataReaders; public FragmentCuboidReader(CubeDesc cubeDesc, FragmentData fragmentData, CuboidMetaInfo cuboidMetaInfo, TblColRef[] dimensions, MeasureDesc[] measures, DimensionEncoding[] dimEncodings) { - this.cubeDesc = cubeDesc; this.dimCnt = dimensions.length; this.metricCnt = measures.length; this.dimensionDataReaders = new ColumnDataReader[dimCnt]; @@ -127,14 +125,14 @@ public class FragmentCuboidReader implements Iterable<RawRecord> { return new Iterator<RawRecord>() { @Override public boolean hasNext() { - if (readRowCount >= rowCount) { - return false; - } - return true; + return readRowCount < rowCount; } @Override public RawRecord next() { + if (!hasNext()) + throw new NoSuchElementException(); + for (int i = 0; i < dimensionDataReaders.length; i++) { oneRawRecord.setDimension(i, dimValItr[i].next()); } diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/SegmentMemoryStore.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/SegmentMemoryStore.java index 2bf9127..24c107d 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/SegmentMemoryStore.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/SegmentMemoryStore.java @@ -25,8 +25,10 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Map.Entry; +import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; @@ -64,8 +66,8 @@ public class SegmentMemoryStore implements IStreamingGTSearcher { protected final ParsedStreamingCubeInfo parsedStreamingCubeInfo; protected final String segmentName; - private volatile Map<CuboidInfo, ConcurrentSkipListMap<String[], MeasureAggregator[]>> cuboidsAggBufMap; - private volatile ConcurrentSkipListMap<String[], MeasureAggregator[]> basicCuboidAggBufMap; + private volatile Map<CuboidInfo, ConcurrentMap<String[], MeasureAggregator[]>> cuboidsAggBufMap; + private volatile ConcurrentMap<String[], MeasureAggregator[]> basicCuboidAggBufMap; private volatile AtomicInteger rowCount = new AtomicInteger(); private volatile AtomicInteger originRowCount = new AtomicInteger(); @@ -84,10 +86,10 @@ public class SegmentMemoryStore implements IStreamingGTSearcher { this.basicCuboidAggBufMap = new ConcurrentSkipListMap<>(StringArrayComparator.INSTANCE); List<CuboidInfo> additionalCuboids = parsedStreamingCubeInfo.getAdditionalCuboidsToBuild(); - if (additionalCuboids != null && additionalCuboids.size() > 0) { + if (additionalCuboids != null && !additionalCuboids.isEmpty()) { this.cuboidsAggBufMap = new ConcurrentHashMap<>(additionalCuboids.size()); for (CuboidInfo cuboidInfo : additionalCuboids) { - cuboidsAggBufMap.put(cuboidInfo, new ConcurrentSkipListMap<String[], MeasureAggregator[]>( + cuboidsAggBufMap.put(cuboidInfo, new ConcurrentSkipListMap<>( StringArrayComparator.INSTANCE)); } } @@ -107,10 +109,10 @@ public class SegmentMemoryStore implements IStreamingGTSearcher { Object[] metricsValues = buildValue(row); aggregate(basicCuboidAggBufMap, basicCuboidDimensions, metricsValues); if (cuboidsAggBufMap != null) { - for (Entry<CuboidInfo, ConcurrentSkipListMap<String[], MeasureAggregator[]>> cuboidAggEntry : cuboidsAggBufMap + for (Entry<CuboidInfo, ConcurrentMap<String[], MeasureAggregator[]>> cuboidAggEntry : cuboidsAggBufMap .entrySet()) { CuboidInfo cuboidInfo = cuboidAggEntry.getKey(); - ConcurrentSkipListMap<String[], MeasureAggregator[]> cuboidAggMap = cuboidAggEntry.getValue(); + ConcurrentMap<String[], MeasureAggregator[]> cuboidAggMap = cuboidAggEntry.getValue(); String[] cuboidDimensions = buildCuboidKey(cuboidInfo, row); aggregate(cuboidAggMap, cuboidDimensions, metricsValues); } @@ -172,7 +174,7 @@ public class SegmentMemoryStore implements IStreamingGTSearcher { } @SuppressWarnings("unchecked") - private void aggregate(ConcurrentSkipListMap<String[], MeasureAggregator[]> cuboidAggBufMap, String[] dimensions, + private void aggregate(ConcurrentMap<String[], MeasureAggregator[]> cuboidAggBufMap, String[] dimensions, Object[] metricsValues) { MeasureAggregator[] aggrs = cuboidAggBufMap.get(dimensions); if (aggrs != null) { @@ -217,20 +219,20 @@ public class SegmentMemoryStore implements IStreamingGTSearcher { return originRowCount.get(); } - public ConcurrentSkipListMap<String[], MeasureAggregator[]> getBasicCuboidData() { + public ConcurrentMap<String[], MeasureAggregator[]> getBasicCuboidData() { return basicCuboidAggBufMap; } - public Map<CuboidInfo, ConcurrentSkipListMap<String[], MeasureAggregator[]>> getAdditionalCuboidsData() { + public Map<CuboidInfo, ConcurrentMap<String[], MeasureAggregator[]>> getAdditionalCuboidsData() { return cuboidsAggBufMap; } - public ConcurrentSkipListMap<String[], MeasureAggregator[]> getCuboidData(long cuboidID) { + public ConcurrentMap<String[], MeasureAggregator[]> getCuboidData(long cuboidID) { if (cuboidID == parsedStreamingCubeInfo.basicCuboid.getId()) { return basicCuboidAggBufMap; } else { CuboidInfo cuboidInfo = new CuboidInfo(cuboidID); - ConcurrentSkipListMap<String[], MeasureAggregator[]> result = cuboidsAggBufMap.get(cuboidInfo); + ConcurrentMap<String[], MeasureAggregator[]> result = cuboidsAggBufMap.get(cuboidInfo); if (result != null) { return result; } @@ -357,6 +359,9 @@ public class SegmentMemoryStore implements IStreamingGTSearcher { @Override public Record next() { + if (nextEntry == null) + throw new NoSuchElementException(); + try { String[] allDimensions = nextEntry.getKey(); MeasureAggregator<?>[] allMetrics = nextEntry.getValue(); @@ -389,8 +394,6 @@ public class SegmentMemoryStore implements IStreamingGTSearcher { System.arraycopy(aggrResult, 0, oneRecord.getMetrics(), 0, aggrResult.length); count++; return oneRecord; - } catch (RuntimeException exception) { - throw exception; } finally { nextEntry = null; } diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/FSInputLZ4CompressedColumnReader.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/FSInputLZ4CompressedColumnReader.java index 0de561e..a00134f 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/FSInputLZ4CompressedColumnReader.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/FSInputLZ4CompressedColumnReader.java @@ -24,6 +24,8 @@ import java.util.Iterator; import java.util.NoSuchElementException; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.kylin.stream.core.storage.columnar.ColumnDataReader; import net.jpountz.lz4.LZ4Factory; @@ -40,10 +42,10 @@ public class FSInputLZ4CompressedColumnReader implements ColumnDataReader { private LZ4SafeDecompressor deCompressor; private FSDataInputStream fsInputStream; - public FSInputLZ4CompressedColumnReader(FSDataInputStream fsInputStream, int columnDataStartOffset, - int columnDataLength, int rowCount) throws IOException { + public FSInputLZ4CompressedColumnReader(FileSystem fs, Path file, int columnDataStartOffset, + int columnDataLength, int rowCount) throws IOException { this.rowCount = rowCount; - this.fsInputStream = fsInputStream; + this.fsInputStream = fs.open(file); int footStartOffset = columnDataStartOffset + columnDataLength - 8; fsInputStream.seek(footStartOffset); this.numValInBlock = fsInputStream.readInt(); @@ -86,11 +88,14 @@ public class FSInputLZ4CompressedColumnReader implements ColumnDataReader { @Override public byte[] next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } if (currBlockNum == -1 || !decompressedBuffer.hasRemaining()) { try { loadNextBuffer(); } catch (IOException e) { - throw new NoSuchElementException("error when read data"); + throw new NoSuchElementException("error when read data " + e.getMessage()); } } byte[] readBuffer = new byte[valLen]; diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/FSInputNoCompressedColumnReader.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/FSInputNoCompressedColumnReader.java index c6255cd..4f1ce8d 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/FSInputNoCompressedColumnReader.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/FSInputNoCompressedColumnReader.java @@ -20,23 +20,22 @@ package org.apache.kylin.stream.core.storage.columnar.compress; import java.io.IOException; import java.util.Iterator; +import java.util.NoSuchElementException; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.kylin.stream.core.storage.columnar.ColumnDataReader; public class FSInputNoCompressedColumnReader implements ColumnDataReader { private FSDataInputStream fsInputStream; private byte[] readBuffer; - private int colDataStartOffset; - private int colValLength; private int rowCount; - public FSInputNoCompressedColumnReader(FSDataInputStream fsInputStream, int colDataStartOffset, int colValLength, - int rowCount) throws IOException { - this.fsInputStream = fsInputStream; - this.colDataStartOffset = colDataStartOffset; + public FSInputNoCompressedColumnReader(FileSystem fs, Path file, int colDataStartOffset, int colValLength, + int rowCount) throws IOException { + this.fsInputStream = fs.open(file); fsInputStream.seek(colDataStartOffset); - this.colValLength = colValLength; this.rowCount = rowCount; this.readBuffer = new byte[colValLength]; } @@ -65,10 +64,13 @@ public class FSInputNoCompressedColumnReader implements ColumnDataReader { @Override public byte[] next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } try { fsInputStream.readFully(readBuffer); } catch (IOException e) { - throw new RuntimeException("error when read data", e); + throw new NoSuchElementException("error when read data " + e.getMessage()); } readRowCount++; return readBuffer; diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/FSInputRLECompressedColumnReader.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/FSInputRLECompressedColumnReader.java index fb17ceb..a5c7e16 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/FSInputRLECompressedColumnReader.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/FSInputRLECompressedColumnReader.java @@ -21,8 +21,11 @@ package org.apache.kylin.stream.core.storage.columnar.compress; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Iterator; +import java.util.NoSuchElementException; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.kylin.stream.core.storage.columnar.ColumnDataReader; public class FSInputRLECompressedColumnReader implements ColumnDataReader { @@ -34,10 +37,10 @@ public class FSInputRLECompressedColumnReader implements ColumnDataReader { private int rowCount; - public FSInputRLECompressedColumnReader(FSDataInputStream fsInputStream, int columnDataStartOffset, - int columnDataLength, int rowCount) throws IOException { + public FSInputRLECompressedColumnReader(FileSystem fs, Path file, int columnDataStartOffset, + int columnDataLength, int rowCount) throws IOException { this.rowCount = rowCount; - this.fsInputStream = fsInputStream; + this.fsInputStream = fs.open(file); int footStartOffset = columnDataStartOffset + columnDataLength - 8; fsInputStream.seek(footStartOffset); this.numValInBlock = fsInputStream.readInt(); @@ -84,6 +87,9 @@ public class FSInputRLECompressedColumnReader implements ColumnDataReader { @Override public byte[] next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } if (readRLEntryValCnt >= currRLEntryValCnt) { loadNextEntry(); } diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/LZ4CompressedColumnReader.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/LZ4CompressedColumnReader.java index b923220..bb09240 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/LZ4CompressedColumnReader.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/LZ4CompressedColumnReader.java @@ -26,6 +26,7 @@ import org.apache.kylin.stream.core.storage.columnar.GeneralColumnDataReader; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Iterator; +import java.util.NoSuchElementException; public class LZ4CompressedColumnReader implements ColumnDataReader { private int rowCount; @@ -36,16 +37,13 @@ public class LZ4CompressedColumnReader implements ColumnDataReader { private int currBlockNum; private LZ4SafeDecompressor deCompressor; - private ByteBuffer dataBuffer; private ByteBuffer decompressedBuffer; -// private byte[] readBuffer; private GeneralColumnDataReader blockDataReader; public LZ4CompressedColumnReader(ByteBuffer dataBuffer, int columnDataStartOffset, int columnDataLength, int rowCount) { this.rowCount = rowCount; - this.dataBuffer = dataBuffer; int footStartOffset = columnDataStartOffset + columnDataLength - 8; dataBuffer.position(footStartOffset); this.numValInBlock = dataBuffer.getInt(); @@ -53,7 +51,6 @@ public class LZ4CompressedColumnReader implements ColumnDataReader { this.blockDataReader = new GeneralColumnDataReader(dataBuffer, columnDataStartOffset, columnDataLength - 8); this.currBlockNum = -1; -// this.readBuffer = new byte[valLen]; this.deCompressor = LZ4Factory.fastestInstance().safeDecompressor(); this.maxBufferLength = numValInBlock * valLen; @@ -104,6 +101,9 @@ public class LZ4CompressedColumnReader implements ColumnDataReader { @Override public byte[] next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } if (currBlockNum == -1 || !decompressedBuffer.hasRemaining()) { loadNextBuffer(); } diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/NoCompressedColumnReader.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/NoCompressedColumnReader.java index 77abd76..9378f3e 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/NoCompressedColumnReader.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/NoCompressedColumnReader.java @@ -21,6 +21,7 @@ package org.apache.kylin.stream.core.storage.columnar.compress; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Iterator; +import java.util.NoSuchElementException; import org.apache.kylin.stream.core.storage.columnar.ColumnDataReader; @@ -67,6 +68,9 @@ public class NoCompressedColumnReader implements ColumnDataReader { @Override public byte[] next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } byte[] readBuffer = new byte[colValLength]; dataBuffer.get(readBuffer); readRowCount++; diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/RunLengthCompressedColumnReader.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/RunLengthCompressedColumnReader.java index 5346a72..131da93 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/RunLengthCompressedColumnReader.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/compress/RunLengthCompressedColumnReader.java @@ -21,6 +21,7 @@ package org.apache.kylin.stream.core.storage.columnar.compress; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Iterator; +import java.util.NoSuchElementException; import org.apache.kylin.stream.core.storage.columnar.ColumnDataReader; import org.apache.kylin.stream.core.storage.columnar.GeneralColumnDataReader; @@ -123,6 +124,10 @@ public class RunLengthCompressedColumnReader implements ColumnDataReader { @Override public byte[] next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + if (readRLEntryValCnt >= currRLEntryValCnt) { loadNextEntry(); } diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/util/RecordsSerializer.java b/stream-core/src/main/java/org/apache/kylin/stream/core/util/RecordsSerializer.java index 640862b..4d5d819 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/util/RecordsSerializer.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/util/RecordsSerializer.java @@ -113,6 +113,7 @@ public class RecordsSerializer { @Override public Record next() { + deserializeRecord(oneRecord, inputBuffer); return oneRecord; } diff --git a/stream-core/src/test/java/org/apache/kylin/stream/core/storage/columnar/compress/LZ4CompressColumnTest.java b/stream-core/src/test/java/org/apache/kylin/stream/core/storage/columnar/compress/LZ4CompressColumnTest.java index 04a9193..40b6443 100644 --- a/stream-core/src/test/java/org/apache/kylin/stream/core/storage/columnar/compress/LZ4CompressColumnTest.java +++ b/stream-core/src/test/java/org/apache/kylin/stream/core/storage/columnar/compress/LZ4CompressColumnTest.java @@ -28,7 +28,6 @@ import java.nio.channels.FileChannel.MapMode; import java.util.Random; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.kylin.common.util.Bytes; @@ -77,8 +76,7 @@ public class LZ4CompressColumnTest { int compressedSize = writeCompressedData(rowCnt); System.out.println("compressed data size:" + compressedSize); FileSystem fs = FileSystem.getLocal(new Configuration()); - FSDataInputStream fsInputStream = fs.open(new Path(tmpColFile.getAbsolutePath())); - try (FSInputLZ4CompressedColumnReader reader = new FSInputLZ4CompressedColumnReader(fsInputStream, 0, + try (FSInputLZ4CompressedColumnReader reader = new FSInputLZ4CompressedColumnReader(fs, new Path(tmpColFile.getAbsolutePath()), 0, compressedSize, rowCnt)) { int k = 0; for (byte[] val : reader) { diff --git a/stream-core/src/test/java/org/apache/kylin/stream/core/storage/columnar/compress/NoCompressColumnTest.java b/stream-core/src/test/java/org/apache/kylin/stream/core/storage/columnar/compress/NoCompressColumnTest.java index 0b1b92c..43d97ea 100644 --- a/stream-core/src/test/java/org/apache/kylin/stream/core/storage/columnar/compress/NoCompressColumnTest.java +++ b/stream-core/src/test/java/org/apache/kylin/stream/core/storage/columnar/compress/NoCompressColumnTest.java @@ -28,7 +28,6 @@ import java.nio.channels.FileChannel.MapMode; import java.util.Random; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.kylin.common.util.Bytes; @@ -74,8 +73,8 @@ public class NoCompressColumnTest { writeNoCompressedData(rowCnt); FileSystem fs = FileSystem.getLocal(new Configuration()); - FSDataInputStream fsInputStream = fs.open(new Path(tmpColFile.getAbsolutePath())); - try (FSInputNoCompressedColumnReader reader = new FSInputNoCompressedColumnReader(fsInputStream, 0, 4, rowCnt)) { + try (FSInputNoCompressedColumnReader reader = new FSInputNoCompressedColumnReader(fs, + new Path(tmpColFile.getAbsolutePath()), 0, 4, rowCnt)) { int k = 0; for (byte[] val : reader) { assertEquals(k, Bytes.toInt(val)); diff --git a/stream-core/src/test/java/org/apache/kylin/stream/core/storage/columnar/compress/RunLengthCompressColumnTest.java b/stream-core/src/test/java/org/apache/kylin/stream/core/storage/columnar/compress/RunLengthCompressColumnTest.java index 3015aa7..bc31db6 100644 --- a/stream-core/src/test/java/org/apache/kylin/stream/core/storage/columnar/compress/RunLengthCompressColumnTest.java +++ b/stream-core/src/test/java/org/apache/kylin/stream/core/storage/columnar/compress/RunLengthCompressColumnTest.java @@ -28,7 +28,6 @@ import java.nio.channels.FileChannel.MapMode; import java.util.Random; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.kylin.common.util.Bytes; @@ -89,8 +88,7 @@ public class RunLengthCompressColumnTest { int size = writeCompressData1(rowCnt); FileSystem fs = FileSystem.getLocal(new Configuration()); - FSDataInputStream fsInputStream = fs.open(new Path(tmpColFile.getAbsolutePath())); - try (FSInputRLECompressedColumnReader reader = new FSInputRLECompressedColumnReader(fsInputStream, 0, size, + try (FSInputRLECompressedColumnReader reader = new FSInputRLECompressedColumnReader(fs, new Path(tmpColFile.getAbsolutePath()), 0, size, rowCnt)) { int k = 0; for (byte[] val : reader) { diff --git a/tool/src/main/java/org/apache/kylin/tool/query/ProbabilityGenerator.java b/tool/src/main/java/org/apache/kylin/tool/query/ProbabilityGenerator.java index 9f32de1..800d4c9 100644 --- a/tool/src/main/java/org/apache/kylin/tool/query/ProbabilityGenerator.java +++ b/tool/src/main/java/org/apache/kylin/tool/query/ProbabilityGenerator.java @@ -70,6 +70,10 @@ public class ProbabilityGenerator { double[] pQueryArray = new double[nOfEle]; int sumHit = generateHitNumberList(nHitArray); + + if (sumHit == 0) + throw new IllegalStateException(); + for (int i = 0; i < nOfEle; i++) { pQueryArray[i] = nHitArray[i] * 1.0 / sumHit; }