temp2
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/1f1a7a86 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/1f1a7a86 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/1f1a7a86 Branch: refs/heads/mhb22 Commit: 1f1a7a8698024621e5b7f84cfe37eb23e691c9dd Parents: 8f6fbd4 Author: Hongbin Ma <mahong...@apache.org> Authored: Thu Dec 22 20:30:54 2016 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Thu Dec 22 20:33:52 2016 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/common/util/BasicTest.java | 1 + .../gtrecord/DummyPartitionStreamer.java | 40 ++++++++++++++++++++ .../storage/gtrecord/IPartitionStreamer.java | 26 +++++++++++++ .../gtrecord/StorageResponseGTScatter.java | 8 ++-- .../apache/kylin/rest/service/QueryService.java | 26 +++++++++++-- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 9 +++-- 6 files changed, 99 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/1f1a7a86/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java ---------------------------------------------------------------------- diff --git a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java index 5eaa011..9105245 100644 --- a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java +++ b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java @@ -34,6 +34,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import com.google.common.collect.Iterators; import org.apache.commons.lang3.time.FastDateFormat; import org.junit.Ignore; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/kylin/blob/1f1a7a86/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/DummyPartitionStreamer.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/DummyPartitionStreamer.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/DummyPartitionStreamer.java new file mode 100644 index 0000000..4caaed0 --- /dev/null +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/DummyPartitionStreamer.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.gtrecord; + +import java.io.IOException; +import java.util.Iterator; + +public class DummyPartitionStreamer implements IPartitionStreamer { + private Iterator<byte[]> iterator; + + public DummyPartitionStreamer(Iterator<byte[]> iterator) { + this.iterator = iterator; + } + + @Override + public void close() throws IOException { + //do nothing + } + + @Override + public Iterator<byte[]> asByteArrayIterator() { + return this.iterator; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/1f1a7a86/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/IPartitionStreamer.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/IPartitionStreamer.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/IPartitionStreamer.java new file mode 100644 index 0000000..42f1151 --- /dev/null +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/IPartitionStreamer.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.gtrecord; + +import java.io.Closeable; +import java.util.Iterator; + +public interface IPartitionStreamer extends Closeable { + public Iterator<byte[]> asByteArrayIterator(); +} http://git-wip-us.apache.org/repos/asf/kylin/blob/1f1a7a86/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java index 88116b5..6283340 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java @@ -42,14 +42,16 @@ public class StorageResponseGTScatter implements IGTScanner { private static final Logger logger = LoggerFactory.getLogger(StorageResponseGTScatter.class); private GTInfo info; + private IPartitionStreamer partitionStreamer; private Iterator<byte[]> blocks; private ImmutableBitSet columns; private long totalScannedCount; private int storagePushDownLimit = -1; - public StorageResponseGTScatter(GTInfo info, Iterator<byte[]> blocks, ImmutableBitSet columns, long totalScannedCount, int storagePushDownLimit) { + public StorageResponseGTScatter(GTInfo info, IPartitionStreamer partitionStreamer, ImmutableBitSet columns, long totalScannedCount, int storagePushDownLimit) { this.info = info; - this.blocks = blocks; + this.partitionStreamer = partitionStreamer; + this.blocks = partitionStreamer.asByteArrayIterator(); this.columns = columns; this.totalScannedCount = totalScannedCount; this.storagePushDownLimit = storagePushDownLimit; @@ -68,7 +70,7 @@ public class StorageResponseGTScatter implements IGTScanner { @Override public void close() throws IOException { //If upper consumer failed while consuming the GTRecords, the consumer should call IGTScanner's close method to ensure releasing resource - Iterators. + partitionStreamer.close(); } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/1f1a7a86/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index 82bc407..82e4a87 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -19,8 +19,27 @@ package org.apache.kylin.rest.service; import java.io.IOException; -import java.sql.*; -import java.util.*; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; import javax.annotation.PostConstruct; import javax.sql.DataSource; @@ -344,8 +363,7 @@ public class QueryService extends BasicService { if (queryCacheEnabled && // !sqlResponse.getIsException() && // (sqlResponse.getDuration() > durationThreshold || sqlResponse.getTotalScanCount() > scancountThreshold) && // - (sqlResponse.getResults().size() < kylinConfig.getLargeQueryThreshold())) //don't cache too large response - { + (sqlResponse.getResults().size() < kylinConfig.getLargeQueryThreshold())) { //don't cache too large response cacheManager.getCache(SUCCESS_QUERY_CACHE).put(new Element(sqlRequest, sqlResponse)); } } else { http://git-wip-us.apache.org/repos/asf/kylin/blob/1f1a7a86/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index d99f80e..ebacb26 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -40,12 +40,13 @@ import org.apache.kylin.common.util.CompressionUtils; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.LoggableCachedThreadPool; import org.apache.kylin.common.util.Pair; -import org.apache.kylin.metadata.model.ISegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.GTScanSelfTerminatedException; import org.apache.kylin.gridtable.IGTScanner; +import org.apache.kylin.metadata.model.ISegment; +import org.apache.kylin.storage.gtrecord.DummyPartitionStreamer; import org.apache.kylin.storage.gtrecord.StorageResponseGTScatter; import org.apache.kylin.storage.hbase.HBaseConnection; import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos; @@ -127,14 +128,14 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { //TODO: raw scan can be constructed at region side to reduce traffic List<RawScan> rawScans = preparedHBaseScans(scanRequest.getGTScanRanges(), selectedColBlocks); rawScanByteString = serializeRawScans(rawScans); - + int coprocessorTimeout = getCoprocessorTimeoutMillis(); scanRequest.setTimeout(coprocessorTimeout); scanRequest.clearScanRanges();//since raw scans are sent to coprocessor, we don't need to duplicate sending it scanRequestByteString = serializeGTScanReq(scanRequest); final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(shardNum, coprocessorTimeout); - + logger.info("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", scanRequestByteString.size(), rawScanByteString.size()); logger.info("The scan {} for segment {} is as below with {} separate raw scans, shard part of start/end key is set to 0", Integer.toHexString(System.identityHashCode(scanRequest)), cubeSeg, rawScans.size()); @@ -230,7 +231,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { }); } - return new StorageResponseGTScatter(fullGTInfo, epResultItr, scanRequest.getColumns(), totalScannedCount.get(), scanRequest.getStoragePushDownLimit()); + return new StorageResponseGTScatter(fullGTInfo, new DummyPartitionStreamer(epResultItr), scanRequest.getColumns(), totalScannedCount.get(), scanRequest.getStoragePushDownLimit()); } private ByteString serializeGTScanReq(GTScanRequest scanRequest) {