minor code refactors
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/fa4a5ee2 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/fa4a5ee2 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/fa4a5ee2 Branch: refs/heads/master-cdh5.7 Commit: fa4a5ee269eea66eee5b8ab8278305dfe0144c92 Parents: 00de441 Author: Hongbin Ma <mahong...@apache.org> Authored: Thu Dec 22 09:59:43 2016 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Tue Dec 27 22:15:44 2016 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 6 ++- .../org/apache/kylin/common/util/BasicTest.java | 1 + .../gtrecord/DummyPartitionStreamer.java | 40 ++++++++++++++++++++ .../storage/gtrecord/IPartitionStreamer.java | 26 +++++++++++++ .../gtrecord/StorageResponseGTScatter.java | 9 +++-- .../apache/kylin/rest/service/QueryService.java | 6 ++- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 9 +++-- 7 files changed, 87 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/fa4a5ee2/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 8080577..d73b694 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -750,7 +750,7 @@ abstract public class KylinConfigBase implements Serializable { return Boolean.valueOf(getOptional("kylin.query.skip-empty-segments", "true")); } - @Deprecated//Limit is good even it's large. This config is meaning less since we already have scan threshold + @Deprecated //Limit is good even it's large. This config is meaning less since we already have scan threshold public int getStoragePushDownLimitMax() { return Integer.parseInt(getOptional("kylin.query.max-limit-pushdown", "10000")); } @@ -759,6 +759,10 @@ abstract public class KylinConfigBase implements Serializable { return Integer.parseInt(getOptional("kylin.query.scan-threshold", "10000000")); } + public int getLargeQueryThreshold() { + return Integer.parseInt(getOptional("kylin.query.large-query-threshold", String.valueOf((int) (getScanThreshold() * 0.1)))); + } + public int getDerivedInThreshold() { return Integer.parseInt(getOptional("kylin.query.derived-filter-translation-threshold", "20")); } http://git-wip-us.apache.org/repos/asf/kylin/blob/fa4a5ee2/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java ---------------------------------------------------------------------- diff --git a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java index 5eaa011..9105245 100644 --- a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java +++ b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java @@ -34,6 +34,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import com.google.common.collect.Iterators; import org.apache.commons.lang3.time.FastDateFormat; import org.junit.Ignore; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/kylin/blob/fa4a5ee2/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/DummyPartitionStreamer.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/DummyPartitionStreamer.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/DummyPartitionStreamer.java new file mode 100644 index 0000000..4caaed0 --- /dev/null +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/DummyPartitionStreamer.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.gtrecord; + +import java.io.IOException; +import java.util.Iterator; + +public class DummyPartitionStreamer implements IPartitionStreamer { + private Iterator<byte[]> iterator; + + public DummyPartitionStreamer(Iterator<byte[]> iterator) { + this.iterator = iterator; + } + + @Override + public void close() throws IOException { + //do nothing + } + + @Override + public Iterator<byte[]> asByteArrayIterator() { + return this.iterator; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/fa4a5ee2/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/IPartitionStreamer.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/IPartitionStreamer.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/IPartitionStreamer.java new file mode 100644 index 0000000..42f1151 --- /dev/null +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/IPartitionStreamer.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.gtrecord; + +import java.io.Closeable; +import java.util.Iterator; + +public interface IPartitionStreamer extends Closeable { + public Iterator<byte[]> asByteArrayIterator(); +} http://git-wip-us.apache.org/repos/asf/kylin/blob/fa4a5ee2/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java index fe1afd3..6283340 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java @@ -42,14 +42,16 @@ public class StorageResponseGTScatter implements IGTScanner { private static final Logger logger = LoggerFactory.getLogger(StorageResponseGTScatter.class); private GTInfo info; + private IPartitionStreamer partitionStreamer; private Iterator<byte[]> blocks; private ImmutableBitSet columns; private long totalScannedCount; private int storagePushDownLimit = -1; - public StorageResponseGTScatter(GTInfo info, Iterator<byte[]> blocks, ImmutableBitSet columns, long totalScannedCount, int storagePushDownLimit) { + public StorageResponseGTScatter(GTInfo info, IPartitionStreamer partitionStreamer, ImmutableBitSet columns, long totalScannedCount, int storagePushDownLimit) { this.info = info; - this.blocks = blocks; + this.partitionStreamer = partitionStreamer; + this.blocks = partitionStreamer.asByteArrayIterator(); this.columns = columns; this.totalScannedCount = totalScannedCount; this.storagePushDownLimit = storagePushDownLimit; @@ -67,7 +69,8 @@ public class StorageResponseGTScatter implements IGTScanner { @Override public void close() throws IOException { - //do nothing + //If upper consumer failed while consuming the GTRecords, the consumer should call IGTScanner's close method to ensure releasing resource + partitionStreamer.close(); } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/fa4a5ee2/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index 8810c85..82e4a87 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -360,8 +360,10 @@ public class QueryService extends BasicService { sqlResponse.setDuration(System.currentTimeMillis() - startTime); logger.info("Stats of SQL response: isException: {}, duration: {}, total scan count {}", // String.valueOf(sqlResponse.getIsException()), String.valueOf(sqlResponse.getDuration()), String.valueOf(sqlResponse.getTotalScanCount())); - if (queryCacheEnabled && !sqlResponse.getIsException() // - && (sqlResponse.getDuration() > durationThreshold || sqlResponse.getTotalScanCount() > scancountThreshold)) { + if (queryCacheEnabled && // + !sqlResponse.getIsException() && // + (sqlResponse.getDuration() > durationThreshold || sqlResponse.getTotalScanCount() > scancountThreshold) && // + (sqlResponse.getResults().size() < kylinConfig.getLargeQueryThreshold())) { //don't cache too large response cacheManager.getCache(SUCCESS_QUERY_CACHE).put(new Element(sqlRequest, sqlResponse)); } } else { http://git-wip-us.apache.org/repos/asf/kylin/blob/fa4a5ee2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index d99f80e..ebacb26 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -40,12 +40,13 @@ import org.apache.kylin.common.util.CompressionUtils; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.LoggableCachedThreadPool; import org.apache.kylin.common.util.Pair; -import org.apache.kylin.metadata.model.ISegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.GTScanSelfTerminatedException; import org.apache.kylin.gridtable.IGTScanner; +import org.apache.kylin.metadata.model.ISegment; +import org.apache.kylin.storage.gtrecord.DummyPartitionStreamer; import org.apache.kylin.storage.gtrecord.StorageResponseGTScatter; import org.apache.kylin.storage.hbase.HBaseConnection; import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos; @@ -127,14 +128,14 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { //TODO: raw scan can be constructed at region side to reduce traffic List<RawScan> rawScans = preparedHBaseScans(scanRequest.getGTScanRanges(), selectedColBlocks); rawScanByteString = serializeRawScans(rawScans); - + int coprocessorTimeout = getCoprocessorTimeoutMillis(); scanRequest.setTimeout(coprocessorTimeout); scanRequest.clearScanRanges();//since raw scans are sent to coprocessor, we don't need to duplicate sending it scanRequestByteString = serializeGTScanReq(scanRequest); final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(shardNum, coprocessorTimeout); - + logger.info("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", scanRequestByteString.size(), rawScanByteString.size()); logger.info("The scan {} for segment {} is as below with {} separate raw scans, shard part of start/end key is set to 0", Integer.toHexString(System.identityHashCode(scanRequest)), cubeSeg, rawScans.size()); @@ -230,7 +231,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { }); } - return new StorageResponseGTScatter(fullGTInfo, epResultItr, scanRequest.getColumns(), totalScannedCount.get(), scanRequest.getStoragePushDownLimit()); + return new StorageResponseGTScatter(fullGTInfo, new DummyPartitionStreamer(epResultItr), scanRequest.getColumns(), totalScannedCount.get(), scanRequest.getStoragePushDownLimit()); } private ByteString serializeGTScanReq(GTScanRequest scanRequest) {