KYLIN-2437 collect number of bytes scanned to query metrics
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e09338b3 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e09338b3 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e09338b3 Branch: refs/heads/master-hbase0.98 Commit: e09338b34c0b07a7167096e45bf9185aa0d0cbd5 Parents: ecf6a69 Author: gaodayue <gaoda...@meituan.com> Authored: Wed Feb 8 13:59:31 2017 +0800 Committer: gaodayue <gaoda...@meituan.com> Committed: Fri Feb 10 15:39:11 2017 +0800 ---------------------------------------------------------------------- .../kylin/gridtable/GTAggregateScanner.java | 11 +- .../apache/kylin/gridtable/GTScanRequest.java | 10 +- .../apache/kylin/gridtable/ScannerWorker.java | 64 --------- .../apache/kylin/storage/StorageContext.java | 9 ++ .../storage/gtrecord/CubeSegmentScanner.java | 3 +- .../kylin/storage/gtrecord/ScannerWorker.java | 71 ++++++++++ .../apache/kylin/rest/response/SQLResponse.java | 10 ++ .../apache/kylin/rest/service/QueryService.java | 9 +- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 7 +- .../storage/hbase/cube/v2/CubeHBaseRPC.java | 6 +- .../storage/hbase/cube/v2/CubeHBaseScanRPC.java | 17 ++- .../coprocessor/endpoint/CubeVisitService.java | 142 +++++++++++-------- .../endpoint/generated/CubeVisitProtos.java | 107 ++++++++++++-- .../endpoint/protobuf/CubeVisit.proto | 1 + 14 files changed, 309 insertions(+), 158 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/e09338b3/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java index 147dbc1..dd359f8 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java @@ -65,7 +65,6 @@ public class GTAggregateScanner implements IGTScanner { final AggregationCache aggrCache; final long spillThreshold; // 0 means no memory control && no spill final int storagePushDownLimit;//default to be Int.MAX - final long deadline; final boolean spillEnabled; private int aggregatedRowCount = 0; @@ -73,10 +72,10 @@ public class GTAggregateScanner implements IGTScanner { private boolean[] aggrMask; public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req) { - this(inputScanner, req, Long.MAX_VALUE, true); + this(inputScanner, req, true); } - public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req, long deadline, boolean spillEnabled) { + public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req, boolean spillEnabled) { if (!req.hasAggregation()) throw new IllegalStateException(); @@ -90,7 +89,6 @@ public class GTAggregateScanner implements IGTScanner { this.spillThreshold = (long) (req.getAggCacheMemThreshold() * MemoryBudgetController.ONE_GB); this.aggrMask = new boolean[metricsAggrFuncs.length]; this.storagePushDownLimit = req.getStoragePushDownLimit(); - this.deadline = deadline; this.spillEnabled = spillEnabled; Arrays.fill(aggrMask, true); @@ -145,11 +143,6 @@ public class GTAggregateScanner implements IGTScanner { long count = 0; for (GTRecord r : inputScanner) { - //check deadline - if (count % GTScanRequest.terminateCheckInterval == 1 && System.currentTimeMillis() > deadline) { - throw new GTScanTimeoutException("Timeout in GTAggregateScanner with scanned count " + count); - } - if (getNumOfSpills() == 0) { //check limit boolean ret = aggrCache.aggregate(r, storagePushDownLimit); http://git-wip-us.apache.org/repos/asf/kylin/blob/e09338b3/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java index 7c94f5a..651e5c4 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java @@ -156,7 +156,7 @@ public class GTScanRequest { } public IGTScanner decorateScanner(IGTScanner scanner) throws IOException { - return decorateScanner(scanner, true, true, Long.MAX_VALUE); + return decorateScanner(scanner, true, true); } /** @@ -165,14 +165,14 @@ public class GTScanRequest { * * Refer to CoprocessorBehavior for explanation */ - public IGTScanner decorateScanner(IGTScanner scanner, boolean filterToggledOn, boolean aggrToggledOn, long deadline) throws IOException { - return decorateScanner(scanner, filterToggledOn, aggrToggledOn, false, deadline, true); + public IGTScanner decorateScanner(IGTScanner scanner, boolean filterToggledOn, boolean aggrToggledOn) throws IOException { + return decorateScanner(scanner, filterToggledOn, aggrToggledOn, false, true); } /** * hasPreFiltered indicate the data has been filtered before scanning */ - public IGTScanner decorateScanner(IGTScanner scanner, boolean filterToggledOn, boolean aggrToggledOn, boolean hasPreFiltered, long deadline, boolean spillEnabled) throws IOException { + public IGTScanner decorateScanner(IGTScanner scanner, boolean filterToggledOn, boolean aggrToggledOn, boolean hasPreFiltered, boolean spillEnabled) throws IOException { IGTScanner result = scanner; if (!filterToggledOn) { //Skip reading this section if you're not profiling! int scanned = lookAndForget(result); @@ -194,7 +194,7 @@ public class GTScanRequest { } else if (this.hasAggregation()) { logger.info("pre aggregating results before returning"); this.doingStorageAggregation = true; - result = new GTAggregateScanner(result, this, deadline, spillEnabled); + result = new GTAggregateScanner(result, this, spillEnabled); } else { logger.info("has no aggregation, skip it"); } http://git-wip-us.apache.org/repos/asf/kylin/blob/e09338b3/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java b/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java deleted file mode 100644 index f26d993..0000000 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.gridtable; - -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.util.Iterator; - -import org.apache.kylin.metadata.model.ISegment; -import org.apache.kylin.cube.cuboid.Cuboid; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ScannerWorker { - - private static final Logger logger = LoggerFactory.getLogger(ScannerWorker.class); - private IGTScanner internal = null; - - public ScannerWorker(ISegment segment, Cuboid cuboid, GTScanRequest scanRequest, String gtStorage) { - if (scanRequest == null) { - logger.info("Segment {} will be skipped", segment); - internal = new EmptyGTScanner(0); - return; - } - - final GTInfo info = scanRequest.getInfo(); - - try { - IGTStorage rpc = (IGTStorage) Class.forName(gtStorage).getConstructor(ISegment.class, Cuboid.class, GTInfo.class).newInstance(segment, cuboid, info); // default behavior - internal = rpc.getGTScanner(scanRequest); - } catch (IOException | InstantiationException | InvocationTargetException | IllegalAccessException | ClassNotFoundException | NoSuchMethodException e) { - throw new RuntimeException(e); - } - } - - public Iterator<GTRecord> iterator() { - return internal.iterator(); - } - - public void close() throws IOException { - internal.close(); - } - - public long getScannedRowCount() { - return internal.getScannedRowCount(); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/e09338b3/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java index ab0ea73..708dfde 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java @@ -50,6 +50,7 @@ public class StorageContext { private IStorageQuery storageQuery; private AtomicLong totalScanCount = new AtomicLong(); + private AtomicLong totalScanBytes = new AtomicLong(); private Cuboid cuboid; private boolean partialResultReturned = false; @@ -161,6 +162,14 @@ public class StorageContext { return this.totalScanCount.addAndGet(count); } + public long getTotalScanBytes() { + return totalScanBytes.get(); + } + + public long increaseTotalScanBytes(long bytes) { + return totalScanBytes.addAndGet(bytes); + } + public boolean isAcceptPartialResult() { return acceptPartialResult; } http://git-wip-us.apache.org/repos/asf/kylin/blob/e09338b3/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java index 9d6f946..974b8ea 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java @@ -30,7 +30,6 @@ import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.IGTScanner; -import org.apache.kylin.gridtable.ScannerWorker; import org.apache.kylin.metadata.filter.ITupleFilterTransformer; import org.apache.kylin.metadata.filter.StringCodeSystem; import org.apache.kylin.metadata.filter.TupleFilter; @@ -79,7 +78,7 @@ public class CubeSegmentScanner implements IGTScanner { } scanRequest = scanRangePlanner.planScanRequest(); String gtStorage = ((GTCubeStorageQueryBase) context.getStorageQuery()).getGTStorage(); - scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest, gtStorage); + scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest, gtStorage, context); } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/e09338b3/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java new file mode 100644 index 0000000..2a2a86a --- /dev/null +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.gtrecord; + +import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.gridtable.EmptyGTScanner; +import org.apache.kylin.gridtable.GTInfo; +import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.gridtable.GTScanRequest; +import org.apache.kylin.gridtable.IGTScanner; +import org.apache.kylin.gridtable.IGTStorage; +import org.apache.kylin.metadata.model.ISegment; +import org.apache.kylin.storage.StorageContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.util.Iterator; + +public class ScannerWorker { + + private static final Logger logger = LoggerFactory.getLogger(ScannerWorker.class); + private IGTScanner internal = null; + + public ScannerWorker(ISegment segment, Cuboid cuboid, GTScanRequest scanRequest, String gtStorage, StorageContext context) { + if (scanRequest == null) { + logger.info("Segment {} will be skipped", segment); + internal = new EmptyGTScanner(0); + return; + } + + final GTInfo info = scanRequest.getInfo(); + + try { + IGTStorage rpc = (IGTStorage) Class.forName(gtStorage).getConstructor(ISegment.class, Cuboid.class, GTInfo.class, StorageContext.class).newInstance(segment, cuboid, info, context); // default behavior + internal = rpc.getGTScanner(scanRequest); + } catch (IOException | InstantiationException | InvocationTargetException | IllegalAccessException | ClassNotFoundException | NoSuchMethodException e) { + throw new RuntimeException(e); + } + } + + public Iterator<GTRecord> iterator() { + return internal.iterator(); + } + + public void close() throws IOException { + internal.close(); + } + + public long getScannedRowCount() { + return internal.getScannedRowCount(); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/e09338b3/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java index 9c4e9da..387e6c9 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java +++ b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java @@ -55,6 +55,8 @@ public class SQLResponse implements Serializable { protected long totalScanCount; + protected long totalScanBytes; + protected boolean hitExceptionCache = false; protected boolean storageCacheUsed = false; @@ -150,6 +152,14 @@ public class SQLResponse implements Serializable { this.totalScanCount = totalScanCount; } + public long getTotalScanBytes() { + return totalScanBytes; + } + + public void setTotalScanBytes(long totalScanBytes) { + this.totalScanBytes = totalScanBytes; + } + public boolean isHitExceptionCache() { return hitExceptionCache; } http://git-wip-us.apache.org/repos/asf/kylin/blob/e09338b3/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index 7ce38ea..9ccda03 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -276,6 +276,7 @@ public class QueryService extends BasicService { stringBuilder.append("Realization Names: ").append(realizationNames).append(newLine); stringBuilder.append("Cuboid Ids: ").append(cuboidIds).append(newLine); stringBuilder.append("Total scan count: ").append(response.getTotalScanCount()).append(newLine); + stringBuilder.append("Total scan bytes: ").append(response.getTotalScanBytes()).append(newLine); stringBuilder.append("Result row count: ").append(resultRowCount).append(newLine); stringBuilder.append("Accept Partial: ").append(request.isAcceptPartial()).append(newLine); stringBuilder.append("Is Partial Result: ").append(response.isPartial()).append(newLine); @@ -580,15 +581,18 @@ public class QueryService extends BasicService { boolean isPartialResult = false; String cube = ""; - StringBuilder sb = new StringBuilder("Scan count for each storageContext: "); + StringBuilder sb = new StringBuilder("Scan stats for each storageContext: "); long totalScanCount = 0; + long totalScanBytes = 0; if (OLAPContext.getThreadLocalContexts() != null) { // contexts can be null in case of 'explain plan for' for (OLAPContext ctx : OLAPContext.getThreadLocalContexts()) { if (ctx.realization != null) { isPartialResult |= ctx.storageContext.isPartialResultReturned(); cube = ctx.realization.getName(); totalScanCount += ctx.storageContext.getTotalScanCount(); - sb.append(ctx.storageContext.getTotalScanCount() + ","); + totalScanBytes += ctx.storageContext.getTotalScanBytes(); + sb.append("{rows=").append(ctx.storageContext.getTotalScanCount()). + append(" bytes=").append(ctx.storageContext.getTotalScanBytes()).append("} "); } } } @@ -596,6 +600,7 @@ public class QueryService extends BasicService { SQLResponse response = new SQLResponse(columnMetas, results, cube, 0, false, null, isPartialResult); response.setTotalScanCount(totalScanCount); + response.setTotalScanBytes(totalScanBytes); return response; } http://git-wip-us.apache.org/repos/asf/kylin/blob/e09338b3/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index dd9f74c..a2b2611 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -47,6 +47,7 @@ import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.GTScanSelfTerminatedException; import org.apache.kylin.gridtable.IGTScanner; import org.apache.kylin.metadata.model.ISegment; +import org.apache.kylin.storage.StorageContext; import org.apache.kylin.storage.gtrecord.DummyPartitionStreamer; import org.apache.kylin.storage.gtrecord.StorageResponseGTScatter; import org.apache.kylin.storage.hbase.HBaseConnection; @@ -69,8 +70,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { private static ExecutorService executorService = new LoggableCachedThreadPool(); - public CubeHBaseEndpointRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo) { - super(segment, cuboid, fullGTInfo); + public CubeHBaseEndpointRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo, StorageContext context) { + super(segment, cuboid, fullGTInfo, context); } private byte[] getByteArrayForShort(short v) { @@ -198,6 +199,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { if (region == null) return; + context.increaseTotalScanBytes(result.getStats().getScannedBytes()); totalScannedCount.addAndGet(result.getStats().getScannedRowCount()); logger.info(logHeader + getStatsString(region, result)); @@ -280,6 +282,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { Stats stats = result.getStats(); sb.append("Endpoint RPC returned from HTable ").append(cubeSeg.getStorageLocationIdentifier()).append(" Shard ").append(BytesUtil.toHex(region)).append(" on host: ").append(stats.getHostname()).append("."); sb.append("Total scanned row: ").append(stats.getScannedRowCount()).append(". "); + sb.append("Total scanned bytes: ").append(stats.getScannedBytes()).append(". "); sb.append("Total filtered/aggred row: ").append(stats.getAggregatedRowCount()).append(". "); sb.append("Time elapsed in EP: ").append(stats.getServiceEndTime() - stats.getServiceStartTime()).append("(ms). "); sb.append("Server CPU usage: ").append(stats.getSystemCpuLoad()).append(", server physical mem left: ").append(stats.getFreePhysicalMemorySize()).append(", server swap mem left:").append(stats.getFreeSwapSpaceSize()).append("."); http://git-wip-us.apache.org/repos/asf/kylin/blob/e09338b3/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java index 05b34c7..11fbb19 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java @@ -48,6 +48,7 @@ import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.gridtable.GTScanRange; import org.apache.kylin.gridtable.IGTStorage; +import org.apache.kylin.storage.StorageContext; import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,15 +63,18 @@ public abstract class CubeHBaseRPC implements IGTStorage { final protected CubeSegment cubeSeg; final protected Cuboid cuboid; final protected GTInfo fullGTInfo; + final protected StorageContext context; + final private RowKeyEncoder fuzzyKeyEncoder; final private RowKeyEncoder fuzzyMaskEncoder; - public CubeHBaseRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo) { + public CubeHBaseRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo, StorageContext context) { Preconditions.checkArgument(segment instanceof CubeSegment, "segment must be CubeSegment"); this.cubeSeg = (CubeSegment) segment; this.cuboid = cuboid; this.fullGTInfo = fullGTInfo; + this.context = context; this.fuzzyKeyEncoder = new FuzzyKeyEncoder(cubeSeg, cuboid); this.fuzzyMaskEncoder = new FuzzyMaskEncoder(cubeSeg, cuboid); http://git-wip-us.apache.org/repos/asf/kylin/blob/e09338b3/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java index a52af90..b94346c 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.List; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Result; @@ -41,6 +42,7 @@ import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.IGTScanner; import org.apache.kylin.gridtable.IGTStore; import org.apache.kylin.metadata.model.ISegment; +import org.apache.kylin.storage.StorageContext; import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,8 +88,8 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC { } } - public CubeHBaseScanRPC(ISegment segment, Cuboid cuboid, final GTInfo fullGTInfo) { - super(segment, cuboid, fullGTInfo); + public CubeHBaseScanRPC(ISegment segment, Cuboid cuboid, final GTInfo fullGTInfo, StorageContext context) { + super(segment, cuboid, fullGTInfo, context); } @Override @@ -180,12 +182,15 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC { final Iterator<Result> allResultsIterator = Iterators.concat(resultIterators.iterator()); CellListIterator cellListIterator = new CellListIterator() { + long scanBytes = 0; + @Override public void close() throws IOException { for (ResultScanner scanner : scanners) { scanner.close(); } hbaseTable.close(); + context.increaseTotalScanBytes(scanBytes); } @Override @@ -195,7 +200,11 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC { @Override public List<Cell> next() { - return allResultsIterator.next().listCells(); + List<Cell> result = allResultsIterator.next().listCells(); + for (Cell cell : result) { + scanBytes += CellUtil.estimatedSizeOf(cell); + } + return result; } @Override @@ -232,4 +241,4 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC { } }; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/e09338b3/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java index 3e0a065..1f6425f 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java @@ -33,6 +33,7 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.client.Scan; @@ -87,7 +88,19 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement private long serviceStartTime; - static class InnerScannerAsIterator implements CellListIterator { + abstract static class BaseCellListIterator implements CellListIterator { + @Override + public final void remove() { + throw new UnsupportedOperationException(); + } + + @Override + public final void close() throws IOException { + // no op. we close all region scanners at final block. + } + } + + static class InnerScannerAsIterator extends BaseCellListIterator { private RegionScanner regionScanner; private List<Cell> nextOne = Lists.newArrayList(); private List<Cell> ret = Lists.newArrayList(); @@ -127,15 +140,58 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement } return ret; } + } + + // TODO move this logic to HBaseReadonlyStore once it's been refactored + static class ResourceTrackingCellListIterator extends BaseCellListIterator { + private final Iterator<List<Cell>> delegate; + private final long rowCountLimit; + private final long bytesLimit; + private final long timeout; + private final long deadline; + + private long rowCount; + private long rowBytes; + + ResourceTrackingCellListIterator(Iterator<List<Cell>> delegate, + long rowCountLimit, long bytesLimit, long timeout) { + this.delegate = delegate; + this.rowCountLimit = rowCountLimit; + this.bytesLimit = bytesLimit; + this.timeout = timeout; + this.deadline = System.currentTimeMillis() + timeout; + } @Override - public void remove() { - throw new UnsupportedOperationException(); + public boolean hasNext() { + if (rowCount > rowCountLimit) { + throw new GTScanExceedThresholdException("Number of rows scanned exceeds threshold " + rowCountLimit); + } + if (rowBytes > bytesLimit) { + throw new GTScanExceedThresholdException("Scanned " + rowBytes + " bytes exceeds threshold " + bytesLimit); + } + if ((rowCount % GTScanRequest.terminateCheckInterval == 1) && System.currentTimeMillis() > deadline) { + throw new GTScanTimeoutException("Scan timeout after " + timeout + " ms"); + } + return delegate.hasNext(); } @Override - public void close() throws IOException { - //does not need to close as regionScanner will be closed in finally block + public List<Cell> next() { + List<Cell> result = delegate.next(); + rowCount++; + for (Cell cell : result) { + rowBytes += CellUtil.estimatedSizeOf(cell); + } + return result; + } + + public long getTotalScannedRowCount() { + return rowCount; + } + + public long getTotalScannedRowBytes() { + return rowBytes; } } @@ -237,51 +293,18 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement } final MutableBoolean scanNormalComplete = new MutableBoolean(true); - final long deadline = serviceStartTime + scanReq.getTimeout(); - logger.info("deadline(local) is " + deadline); final long storagePushDownLimit = scanReq.getStoragePushDownLimit(); - final CellListIterator cellListIterator = new CellListIterator() { - - int counter = 0; - - @Override - public void close() throws IOException { - for (CellListIterator closeable : cellListsForeachRawScan) { - closeable.close(); - } - } - - @Override - public boolean hasNext() { - - counter++; - - if (counter > scanReq.getStorageScanRowNumThreshold()) { - throw new GTScanExceedThresholdException("Exceed scan threshold at " + counter + ", consider increasing kylin.query.memory-budget-bytes and kylin.query.scan-threshold"); - } - - if (counter % (10 * GTScanRequest.terminateCheckInterval) == 1) { - logger.info("scanning " + counter + "th row from HBase."); - } - return allCellLists.hasNext(); - } - - @Override - public List<Cell> next() { - return allCellLists.next(); - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; + ResourceTrackingCellListIterator cellListIterator = new ResourceTrackingCellListIterator( + allCellLists, + scanReq.getStorageScanRowNumThreshold(), + Long.MAX_VALUE, + scanReq.getTimeout()); IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScans.get(0).hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize(), behavior.delayToggledOn()); IGTScanner rawScanner = store.scan(scanReq); - IGTScanner finalScanner = scanReq.decorateScanner(rawScanner, behavior.filterToggledOn(), behavior.aggrToggledOn(), false, deadline, request.getSpillEnabled()); + IGTScanner finalScanner = scanReq.decorateScanner(rawScanner, behavior.filterToggledOn(), behavior.aggrToggledOn(), false, request.getSpillEnabled()); ByteBuffer buffer = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE); @@ -290,13 +313,6 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement try { for (GTRecord oneRecord : finalScanner) { - - if (finalRowCount % GTScanRequest.terminateCheckInterval == 1) { - if (System.currentTimeMillis() > deadline) { - throw new GTScanTimeoutException("finalScanner timeouts after contributed " + finalRowCount); - } - } - buffer.clear(); try { oneRecord.exportColumns(scanReq.getColumns(), buffer); @@ -326,6 +342,8 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement } appendProfileInfo(sb, "agg done"); + logger.info("Total scanned {} rows and {} bytes", + cellListIterator.getTotalScannedRowCount(), cellListIterator.getTotalScannedRowBytes()); //outputStream.close() is not necessary byte[] compressedAllRows; @@ -341,6 +359,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement } appendProfileInfo(sb, "compress done"); + logger.info("Size of final result = {} ({} before compressing)", compressedAllRows.length, allRows.length); OperatingSystemMXBean operatingSystemMXBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean(); double systemCpuLoad = operatingSystemMXBean.getSystemCpuLoad(); @@ -353,16 +372,17 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement CubeVisitProtos.CubeVisitResponse.Builder responseBuilder = CubeVisitProtos.CubeVisitResponse.newBuilder(); done.run(responseBuilder.// setCompressedRows(HBaseZeroCopyByteString.wrap(compressedAllRows)).//too many array copies - setStats(CubeVisitProtos.CubeVisitResponse.Stats.newBuilder().// - setAggregatedRowCount(finalScanner.getScannedRowCount() - finalRowCount).// - setScannedRowCount(finalScanner.getScannedRowCount()).// - setServiceStartTime(serviceStartTime).// - setServiceEndTime(System.currentTimeMillis()).// - setSystemCpuLoad(systemCpuLoad).// - setFreePhysicalMemorySize(freePhysicalMemorySize).// - setFreeSwapSpaceSize(freeSwapSpaceSize).// - setHostname(InetAddress.getLocalHost().getHostName()).// - setEtcMsg(sb.toString()).// + setStats(CubeVisitProtos.CubeVisitResponse.Stats.newBuilder(). + setAggregatedRowCount(cellListIterator.getTotalScannedRowCount() - finalRowCount). + setScannedRowCount(cellListIterator.getTotalScannedRowCount()). + setScannedBytes(cellListIterator.getTotalScannedRowBytes()). + setServiceStartTime(serviceStartTime). + setServiceEndTime(System.currentTimeMillis()). + setSystemCpuLoad(systemCpuLoad). + setFreePhysicalMemorySize(freePhysicalMemorySize). + setFreeSwapSpaceSize(freeSwapSpaceSize). + setHostname(InetAddress.getLocalHost().getHostName()). + setEtcMsg(sb.toString()). setNormalComplete(scanNormalComplete.booleanValue() ? 1 : 0).build()) .// build()); http://git-wip-us.apache.org/repos/asf/kylin/blob/e09338b3/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java index def0182..5a3aa5a 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java @@ -2255,6 +2255,16 @@ public final class CubeVisitProtos { * </pre> */ int getNormalComplete(); + + // optional int64 scannedBytes = 11; + /** + * <code>optional int64 scannedBytes = 11;</code> + */ + boolean hasScannedBytes(); + /** + * <code>optional int64 scannedBytes = 11;</code> + */ + long getScannedBytes(); } /** * Protobuf type {@code CubeVisitResponse.Stats} @@ -2357,6 +2367,11 @@ public final class CubeVisitProtos { normalComplete_ = input.readInt32(); break; } + case 88: { + bitField0_ |= 0x00000400; + scannedBytes_ = input.readInt64(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -2619,6 +2634,22 @@ public final class CubeVisitProtos { return normalComplete_; } + // optional int64 scannedBytes = 11; + public static final int SCANNEDBYTES_FIELD_NUMBER = 11; + private long scannedBytes_; + /** + * <code>optional int64 scannedBytes = 11;</code> + */ + public boolean hasScannedBytes() { + return ((bitField0_ & 0x00000400) == 0x00000400); + } + /** + * <code>optional int64 scannedBytes = 11;</code> + */ + public long getScannedBytes() { + return scannedBytes_; + } + private void initFields() { serviceStartTime_ = 0L; serviceEndTime_ = 0L; @@ -2630,6 +2661,7 @@ public final class CubeVisitProtos { hostname_ = ""; etcMsg_ = ""; normalComplete_ = 0; + scannedBytes_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -2673,6 +2705,9 @@ public final class CubeVisitProtos { if (((bitField0_ & 0x00000200) == 0x00000200)) { output.writeInt32(10, normalComplete_); } + if (((bitField0_ & 0x00000400) == 0x00000400)) { + output.writeInt64(11, scannedBytes_); + } getUnknownFields().writeTo(output); } @@ -2722,6 +2757,10 @@ public final class CubeVisitProtos { size += com.google.protobuf.CodedOutputStream .computeInt32Size(10, normalComplete_); } + if (((bitField0_ & 0x00000400) == 0x00000400)) { + size += com.google.protobuf.CodedOutputStream + .computeInt64Size(11, scannedBytes_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -2792,6 +2831,11 @@ public final class CubeVisitProtos { result = result && (getNormalComplete() == other.getNormalComplete()); } + result = result && (hasScannedBytes() == other.hasScannedBytes()); + if (hasScannedBytes()) { + result = result && (getScannedBytes() + == other.getScannedBytes()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -2848,6 +2892,10 @@ public final class CubeVisitProtos { hash = (37 * hash) + NORMALCOMPLETE_FIELD_NUMBER; hash = (53 * hash) + getNormalComplete(); } + if (hasScannedBytes()) { + hash = (37 * hash) + SCANNEDBYTES_FIELD_NUMBER; + hash = (53 * hash) + hashLong(getScannedBytes()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -2977,6 +3025,8 @@ public final class CubeVisitProtos { bitField0_ = (bitField0_ & ~0x00000100); normalComplete_ = 0; bitField0_ = (bitField0_ & ~0x00000200); + scannedBytes_ = 0L; + bitField0_ = (bitField0_ & ~0x00000400); return this; } @@ -3045,6 +3095,10 @@ public final class CubeVisitProtos { to_bitField0_ |= 0x00000200; } result.normalComplete_ = normalComplete_; + if (((from_bitField0_ & 0x00000400) == 0x00000400)) { + to_bitField0_ |= 0x00000400; + } + result.scannedBytes_ = scannedBytes_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -3095,6 +3149,9 @@ public final class CubeVisitProtos { if (other.hasNormalComplete()) { setNormalComplete(other.getNormalComplete()); } + if (other.hasScannedBytes()) { + setScannedBytes(other.getScannedBytes()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -3550,6 +3607,39 @@ public final class CubeVisitProtos { return this; } + // optional int64 scannedBytes = 11; + private long scannedBytes_ ; + /** + * <code>optional int64 scannedBytes = 11;</code> + */ + public boolean hasScannedBytes() { + return ((bitField0_ & 0x00000400) == 0x00000400); + } + /** + * <code>optional int64 scannedBytes = 11;</code> + */ + public long getScannedBytes() { + return scannedBytes_; + } + /** + * <code>optional int64 scannedBytes = 11;</code> + */ + public Builder setScannedBytes(long value) { + bitField0_ |= 0x00000400; + scannedBytes_ = value; + onChanged(); + return this; + } + /** + * <code>optional int64 scannedBytes = 11;</code> + */ + public Builder clearScannedBytes() { + bitField0_ = (bitField0_ & ~0x00000400); + scannedBytes_ = 0L; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:CubeVisitResponse.Stats) } @@ -4349,20 +4439,21 @@ public final class CubeVisitProtos { "ze\030\003 \002(\005\0223\n\020hbaseColumnsToGT\030\004 \003(\0132\031.Cub" + "eVisitRequest.IntList\022\027\n\017kylinProperties" + "\030\005 \002(\t\022\017\n\007queryId\030\006 \001(\t\022\032\n\014spillEnabled\030" + - "\007 \001(\010:\004true\032\027\n\007IntList\022\014\n\004ints\030\001 \003(\005\"\321\002\n" + + "\007 \001(\010:\004true\032\027\n\007IntList\022\014\n\004ints\030\001 \003(\005\"\347\002\n" + "\021CubeVisitResponse\022\026\n\016compressedRows\030\001 \002", "(\014\022\'\n\005stats\030\002 \002(\0132\030.CubeVisitResponse.St" + - "ats\032\372\001\n\005Stats\022\030\n\020serviceStartTime\030\001 \001(\003\022" + + "ats\032\220\002\n\005Stats\022\030\n\020serviceStartTime\030\001 \001(\003\022" + "\026\n\016serviceEndTime\030\002 \001(\003\022\027\n\017scannedRowCou" + "nt\030\003 \001(\003\022\032\n\022aggregatedRowCount\030\004 \001(\003\022\025\n\r" + "systemCpuLoad\030\005 \001(\001\022\036\n\026freePhysicalMemor" + "ySize\030\006 \001(\001\022\031\n\021freeSwapSpaceSize\030\007 \001(\001\022\020" + "\n\010hostname\030\010 \001(\t\022\016\n\006etcMsg\030\t \001(\t\022\026\n\016norm" + - "alComplete\030\n \001(\0052F\n\020CubeVisitService\0222\n\t" + - "visitCube\022\021.CubeVisitRequest\032\022.CubeVisit" + - "ResponseB`\nEorg.apache.kylin.storage.hba", - "se.cube.v2.coprocessor.endpoint.generate" + - "dB\017CubeVisitProtosH\001\210\001\001\240\001\001" + "alComplete\030\n \001(\005\022\024\n\014scannedBytes\030\013 \001(\0032F" + + "\n\020CubeVisitService\0222\n\tvisitCube\022\021.CubeVi" + + "sitRequest\032\022.CubeVisitResponseB`\nEorg.ap", + "ache.kylin.storage.hbase.cube.v2.coproce" + + "ssor.endpoint.generatedB\017CubeVisitProtos" + + "H\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -4392,7 +4483,7 @@ public final class CubeVisitProtos { internal_static_CubeVisitResponse_Stats_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_CubeVisitResponse_Stats_descriptor, - new java.lang.String[] { "ServiceStartTime", "ServiceEndTime", "ScannedRowCount", "AggregatedRowCount", "SystemCpuLoad", "FreePhysicalMemorySize", "FreeSwapSpaceSize", "Hostname", "EtcMsg", "NormalComplete", }); + new java.lang.String[] { "ServiceStartTime", "ServiceEndTime", "ScannedRowCount", "AggregatedRowCount", "SystemCpuLoad", "FreePhysicalMemorySize", "FreeSwapSpaceSize", "Hostname", "EtcMsg", "NormalComplete", "ScannedBytes", }); return null; } }; http://git-wip-us.apache.org/repos/asf/kylin/blob/e09338b3/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto index c7c2954..f416669 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto @@ -54,6 +54,7 @@ message CubeVisitResponse { optional string hostname = 8; optional string etcMsg = 9; optional int32 normalComplete =10;//when time outs, normalComplete will be false + optional int64 scannedBytes = 11; } required bytes compressedRows = 1; required Stats stats = 2;