KYLIN-2005 Move all storage side behavior hints to GTScanRequest
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/a2c875d8 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/a2c875d8 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/a2c875d8 Branch: refs/heads/master Commit: a2c875d8a2d06f23dd6467bbcc459bff82918295 Parents: e38557b Author: Hongbin Ma <mahong...@apache.org> Authored: Fri Sep 9 16:46:22 2016 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Fri Sep 9 17:47:29 2016 +0800 ---------------------------------------------------------------------- .../apache/kylin/gridtable/GTScanRequest.java | 33 +- .../kylin/gridtable/GTScanRequestBuilder.java | 30 +- .../kylin/gridtable/StorageSideBehavior.java | 30 + .../apache/kylin/query/ITKylinQueryTest.java | 4 +- .../common/coprocessor/CoprocessorBehavior.java | 30 - .../observer/AggregateRegionObserver.java | 10 +- .../observer/AggregationScanner.java | 16 +- .../coprocessor/observer/ObserverEnabler.java | 6 +- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 88 +-- .../hbase/cube/v2/ExpectedSizeIterator.java | 4 +- .../coprocessor/endpoint/CubeVisitService.java | 18 +- .../endpoint/generated/CubeVisitProtos.java | 754 ++++--------------- .../endpoint/protobuf/CubeVisit.proto | 13 +- .../observer/AggregateRegionObserverTest.java | 6 +- 14 files changed, 332 insertions(+), 710 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/a2c875d8/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java index 5d27028..3e57e86 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java @@ -59,6 +59,9 @@ public class GTScanRequest { private String[] aggrMetricsFuncs;// // hint to storage behavior + private String storageBehavior; + private long startTime; + private long timeout; private boolean allowStorageAggregation; private double aggCacheMemThreshold; private int storageScanRowNumThreshold; @@ -69,7 +72,7 @@ public class GTScanRequest { GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet dimensions, ImmutableBitSet aggrGroupBy, // ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, TupleFilter filterPushDown, boolean allowStorageAggregation, // - double aggCacheMemThreshold, int storageScanRowNumThreshold, int storagePushDownLimit) { + double aggCacheMemThreshold, int storageScanRowNumThreshold, int storagePushDownLimit, String storageBehavior, long startTime, long timeout) { this.info = info; if (ranges == null) { this.ranges = Lists.newArrayList(new GTScanRange(new GTRecord(info), new GTRecord(info))); @@ -83,6 +86,9 @@ public class GTScanRequest { this.aggrMetrics = aggrMetrics; this.aggrMetricsFuncs = aggrMetricsFuncs; + this.storageBehavior = storageBehavior; + this.startTime = startTime; + this.timeout = timeout; this.allowStorageAggregation = allowStorageAggregation; this.aggCacheMemThreshold = aggCacheMemThreshold; this.storageScanRowNumThreshold = storageScanRowNumThreshold; @@ -115,6 +121,10 @@ public class GTScanRequest { } } + public void setTimeout(long timeout) { + this.timeout = timeout; + } + private void validateFilterPushDown(GTInfo info) { if (!hasFilterPushDown()) return; @@ -280,6 +290,18 @@ public class GTScanRequest { return this.storagePushDownLimit; } + public String getStorageBehavior() { + return storageBehavior; + } + + public long getStartTime() { + return startTime; + } + + public long getTimeout() { + return timeout; + } + @Override public String toString() { return "GTScanRequest [range=" + ranges + ", columns=" + columns + ", filterPushDown=" + filterPushDown + ", aggrGroupBy=" + aggrGroupBy + ", aggrMetrics=" + aggrMetrics + ", aggrMetricsFuncs=" + Arrays.toString(aggrMetricsFuncs) + "]"; @@ -320,6 +342,9 @@ public class GTScanRequest { out.putDouble(value.aggCacheMemThreshold); BytesUtil.writeVInt(value.storageScanRowNumThreshold, out); BytesUtil.writeVInt(value.storagePushDownLimit, out); + BytesUtil.writeVLong(value.startTime, out); + BytesUtil.writeVLong(value.timeout, out); + BytesUtil.writeUTFString(value.storageBehavior, out); } @Override @@ -350,11 +375,15 @@ public class GTScanRequest { double sAggrCacheGB = in.getDouble(); int storageScanRowNumThreshold = BytesUtil.readVInt(in); int storagePushDownLimit = BytesUtil.readVInt(in); + long startTime = BytesUtil.readVLong(in); + long timeout = BytesUtil.readVLong(in); + String storageBehavior = BytesUtil.readUTFString(in); return new GTScanRequestBuilder().setInfo(sInfo).setRanges(sRanges).setDimensions(sColumns).// setAggrGroupBy(sAggGroupBy).setAggrMetrics(sAggrMetrics).setAggrMetricsFuncs(sAggrMetricFuncs).// setFilterPushDown(sGTFilter).setAllowStorageAggregation(sAllowPreAggr).setAggCacheMemThreshold(sAggrCacheGB).// - setStorageScanRowNumThreshold(storageScanRowNumThreshold).setStoragePushDownLimit(storagePushDownLimit).createGTScanRequest(); + setStorageScanRowNumThreshold(storageScanRowNumThreshold).setStoragePushDownLimit(storagePushDownLimit).// + setStartTime(startTime).setTimeout(timeout).setStorageBehavior(storageBehavior).createGTScanRequest(); } private void serializeGTRecord(GTRecord gtRecord, ByteBuffer out) { http://git-wip-us.apache.org/repos/asf/kylin/blob/a2c875d8/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java index c4390cd..f542de1 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java @@ -21,6 +21,7 @@ package org.apache.kylin.gridtable; import java.util.BitSet; import java.util.List; +import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.metadata.filter.TupleFilter; @@ -36,6 +37,9 @@ public class GTScanRequestBuilder { private double aggCacheMemThreshold = 0; private int storageScanRowNumThreshold = Integer.MAX_VALUE;// storage should terminate itself when $storageScanRowNumThreshold cuboid rows are scanned, and throw exception. private int storagePushDownLimit = Integer.MAX_VALUE;// storage can quit working when $toragePushDownLimit aggregated rows are produced. + private long startTime = -1; + private long timeout = -1; + private String storageBehavior = null; public GTScanRequestBuilder setInfo(GTInfo info) { this.info = info; @@ -92,6 +96,21 @@ public class GTScanRequestBuilder { return this; } + public GTScanRequestBuilder setStartTime(long startTime) { + this.startTime = startTime; + return this; + } + + public GTScanRequestBuilder setTimeout(long timeout) { + this.timeout = timeout; + return this; + } + + public GTScanRequestBuilder setStorageBehavior(String storageBehavior) { + this.storageBehavior = storageBehavior; + return this; + } + public GTScanRequest createGTScanRequest() { if (aggrGroupBy == null) { aggrGroupBy = new ImmutableBitSet(new BitSet()); @@ -104,7 +123,14 @@ public class GTScanRequestBuilder { if (aggrMetricsFuncs == null) { aggrMetricsFuncs = new String[0]; } - - return new GTScanRequest(info, ranges, dimensions, aggrGroupBy, aggrMetrics, aggrMetricsFuncs, filterPushDown, allowStorageAggregation, aggCacheMemThreshold, storageScanRowNumThreshold, storagePushDownLimit); + + if (storageBehavior == null) { + storageBehavior = BackdoorToggles.getCoprocessorBehavior() == null ? StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM.toString() : BackdoorToggles.getCoprocessorBehavior(); + } + + this.startTime = startTime == -1 ? System.currentTimeMillis() : startTime; + this.timeout = timeout == -1 ? 300000 : timeout; + + return new GTScanRequest(info, ranges, dimensions, aggrGroupBy, aggrMetrics, aggrMetricsFuncs, filterPushDown, allowStorageAggregation, aggCacheMemThreshold, storageScanRowNumThreshold, storagePushDownLimit, storageBehavior, startTime, timeout); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/a2c875d8/core-cube/src/main/java/org/apache/kylin/gridtable/StorageSideBehavior.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/StorageSideBehavior.java b/core-cube/src/main/java/org/apache/kylin/gridtable/StorageSideBehavior.java new file mode 100644 index 0000000..7fa93e7 --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/StorageSideBehavior.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.gridtable; + +/** + */ +public enum StorageSideBehavior { + RAW_SCAN, //on use RegionScanner to scan raw data, for testing hbase scan speed + SCAN, //only scan data, used for profiling tuple scan speed. Will not return any result + SCAN_FILTER, //only scan+filter used,used for profiling filter speed. Will not return any result + SCAN_FILTER_AGGR, //aggregate the result. Will return results + SCAN_FILTER_AGGR_CHECKMEM, //default full operations. Will return results + SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY, // on each scan operation, delay for 10s to simulate slow queries, for test use +} http://git-wip-us.apache.org/repos/asf/kylin/blob/a2c875d8/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java index fc2fd52..0efea64 100644 --- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java @@ -40,7 +40,7 @@ import org.apache.kylin.query.routing.Candidate; import org.apache.kylin.query.routing.rules.RemoveBlackoutRealizationsRule; import org.apache.kylin.query.schema.OLAPSchemaFactory; import org.apache.kylin.storage.hbase.HBaseStorage; -import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior; +import org.apache.kylin.gridtable.StorageSideBehavior; import org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler; import org.dbunit.database.DatabaseConnection; import org.dbunit.database.IDatabaseConnection; @@ -140,7 +140,7 @@ public class ITKylinQueryTest extends KylinTestBase { }); Map<String, String> toggles = Maps.newHashMap(); - toggles.put(BackdoorToggles.DEBUG_TOGGLE_COPROCESSOR_BEHAVIOR, CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY.toString());//delay 10ms for every scan + toggles.put(BackdoorToggles.DEBUG_TOGGLE_COPROCESSOR_BEHAVIOR, StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY.toString());//delay 10ms for every scan BackdoorToggles.setToggles(toggles); KylinConfig.getInstanceFromEnv().setProperty("kylin.query.cube.visit.timeout.times", "0.03");//set timeout to 9s http://git-wip-us.apache.org/repos/asf/kylin/blob/a2c875d8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorBehavior.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorBehavior.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorBehavior.java deleted file mode 100644 index 5f21351..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorBehavior.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.storage.hbase.common.coprocessor; - -/** - */ -public enum CoprocessorBehavior { - RAW_SCAN, //on use RegionScanner to scan raw data, for testing hbase scan speed - SCAN, //only scan data, used for profiling tuple scan speed. Will not return any result - SCAN_FILTER, //only scan+filter used,used for profiling filter speed. Will not return any result - SCAN_FILTER_AGGR, //aggregate the result. Will return results - SCAN_FILTER_AGGR_CHECKMEM, //default full operations. Will return results - SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY, // on each scan operation, delay for 10s to simulate slow queries, for test use -} http://git-wip-us.apache.org/repos/asf/kylin/blob/a2c875d8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java index c7b650a..7139ca7 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java @@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; import org.apache.hadoop.hbase.regionserver.RegionScanner; -import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior; +import org.apache.kylin.gridtable.StorageSideBehavior; import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter; import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector; import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType; @@ -85,15 +85,15 @@ public class AggregateRegionObserver extends BaseRegionObserver { byte[] filterBytes = scan.getAttribute(FILTER); CoprocessorFilter filter = CoprocessorFilter.deserialize(filterBytes); - CoprocessorBehavior coprocessorBehavior = CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM; + StorageSideBehavior storageSideBehavior = StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM; try { byte[] behavior = scan.getAttribute(BEHAVIOR); if (behavior != null && behavior.length != 0) { - coprocessorBehavior = CoprocessorBehavior.valueOf(new String(behavior)); + storageSideBehavior = StorageSideBehavior.valueOf(new String(behavior)); } } catch (Exception e) { LOG.error("failed to parse behavior,using default behavior SCAN_FILTER_AGGR_CHECKMEM", e); - coprocessorBehavior = CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM; + storageSideBehavior = StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM; } // start/end region operation & sync on scanner is suggested by the @@ -103,7 +103,7 @@ public class AggregateRegionObserver extends BaseRegionObserver { region.startRegionOperation(); try { synchronized (innerScanner) { - return new AggregationScanner(type, filter, projector, aggregators, innerScanner, coprocessorBehavior); + return new AggregationScanner(type, filter, projector, aggregators, innerScanner, storageSideBehavior); } } finally { region.closeRegionOperation(); http://git-wip-us.apache.org/repos/asf/kylin/blob/a2c875d8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java index be26142..a77f988 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java @@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.kylin.measure.MeasureAggregator; import org.apache.kylin.storage.hbase.common.coprocessor.AggrKey; -import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior; +import org.apache.kylin.gridtable.StorageSideBehavior; import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter; import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector; import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType; @@ -39,9 +39,9 @@ import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType; public class AggregationScanner implements RegionScanner { private RegionScanner outerScanner; - private CoprocessorBehavior behavior; + private StorageSideBehavior behavior; - public AggregationScanner(CoprocessorRowType type, CoprocessorFilter filter, CoprocessorProjector groupBy, ObserverAggregators aggrs, RegionScanner innerScanner, CoprocessorBehavior behavior) throws IOException { + public AggregationScanner(CoprocessorRowType type, CoprocessorFilter filter, CoprocessorProjector groupBy, ObserverAggregators aggrs, RegionScanner innerScanner, StorageSideBehavior behavior) throws IOException { AggregateRegionObserver.LOG.info("Kylin Coprocessor start"); @@ -79,23 +79,23 @@ public class AggregationScanner implements RegionScanner { Cell cell = results.get(0); tuple.setUnderlying(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); - if (behavior == CoprocessorBehavior.SCAN) { + if (behavior == StorageSideBehavior.SCAN) { //touch every byte of the cell so that the cost of scanning will be trully reflected int endIndex = cell.getRowOffset() + cell.getRowLength(); for (int i = cell.getRowOffset(); i < endIndex; ++i) { meaninglessByte += cell.getRowArray()[i]; } } else { - if (behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER.ordinal()) { + if (behavior.ordinal() >= StorageSideBehavior.SCAN_FILTER.ordinal()) { if (filter != null && filter.evaluate(tuple) == false) continue; - if (behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR.ordinal()) { + if (behavior.ordinal() >= StorageSideBehavior.SCAN_FILTER_AGGR.ordinal()) { AggrKey aggKey = projector.getAggrKey(results); MeasureAggregator[] bufs = aggCache.getBuffer(aggKey); aggregators.aggregate(bufs, results); - if (behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) { + if (behavior.ordinal() >= StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) { aggCache.checkMemoryUsage(); } } @@ -103,7 +103,7 @@ public class AggregationScanner implements RegionScanner { } } - if (behavior == CoprocessorBehavior.SCAN) { + if (behavior == StorageSideBehavior.SCAN) { System.out.println("meaningless byte is now " + meaninglessByte); } http://git-wip-us.apache.org/repos/asf/kylin/blob/a2c875d8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java index f0e9bed..394b3e2 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java @@ -35,7 +35,7 @@ import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.metadata.filter.TupleFilter; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.storage.StorageContext; -import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior; +import org.apache.kylin.gridtable.StorageSideBehavior; import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter; import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector; import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType; @@ -75,14 +75,14 @@ public class ObserverEnabler { if (localCoprocessor) { RegionScanner innerScanner = new RegionScannerAdapter(table.getScanner(scan)); - AggregationScanner aggrScanner = new AggregationScanner(type, filter, projector, aggrs, innerScanner, CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM); + AggregationScanner aggrScanner = new AggregationScanner(type, filter, projector, aggrs, innerScanner, StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM); return new ResultScannerAdapter(aggrScanner); } else { // debug/profiling purpose String toggle = BackdoorToggles.getCoprocessorBehavior(); if (toggle == null) { - toggle = CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.toString(); //default behavior + toggle = StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM.toString(); //default behavior } else { logger.info("The execution of this query will use " + toggle + " as observer's behavior"); } http://git-wip-us.apache.org/repos/asf/kylin/blob/a2c875d8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index 5b48351..573951b 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.BytesSerializer; import org.apache.kylin.common.util.BytesUtil; @@ -47,7 +46,6 @@ import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.GTScanSelfTerminatedException; import org.apache.kylin.gridtable.IGTScanner; import org.apache.kylin.storage.hbase.HBaseConnection; -import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior; import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos; import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest; import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitResponse; @@ -104,10 +102,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { @Override public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOException { - final String toggle = BackdoorToggles.getCoprocessorBehavior() == null ? CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.toString() : BackdoorToggles.getCoprocessorBehavior(); - - logger.info("New scanner for current segment {} will use {} as endpoint's behavior", cubeSeg, toggle); - Pair<Short, Short> shardNumAndBaseShard = getShardNumAndBaseShard(); short shardNum = shardNumAndBaseShard.getFirst(); short cuboidBaseShard = shardNumAndBaseShard.getSecond(); @@ -130,39 +124,14 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { //TODO: raw scan can be constructed at region side to reduce traffic List<RawScan> rawScans = preparedHBaseScans(scanRequest.getGTScanRanges(), selectedColBlocks); - int rawScanBufferSize = BytesSerializer.SERIALIZE_BUFFER_SIZE; - while (true) { - try { - ByteBuffer rawScanBuffer = ByteBuffer.allocate(rawScanBufferSize); - BytesUtil.writeVInt(rawScans.size(), rawScanBuffer); - for (RawScan rs : rawScans) { - RawScan.serializer.serialize(rs, rawScanBuffer); - } - rawScanBuffer.flip(); - rawScanByteString = HBaseZeroCopyByteString.wrap(rawScanBuffer.array(), rawScanBuffer.position(), rawScanBuffer.limit()); - break; - } catch (BufferOverflowException boe) { - logger.info("Buffer size {} cannot hold the raw scans, resizing to 4 times", rawScanBufferSize); - rawScanBufferSize *= 4; - } - } + rawScanByteString = serializeRawScans(rawScans); + scanRequest.clearScanRanges();//since raw scans are sent to coprocessor, we don't need to duplicate sending it - - int scanRequestBufferSize = BytesSerializer.SERIALIZE_BUFFER_SIZE; - while (true) { - try { - ByteBuffer buffer = ByteBuffer.allocate(scanRequestBufferSize); - GTScanRequest.serializer.serialize(scanRequest, buffer); - buffer.flip(); - scanRequestByteString = HBaseZeroCopyByteString.wrap(buffer.array(), buffer.position(), buffer.limit()); - break; - } catch (BufferOverflowException boe) { - logger.info("Buffer size {} cannot hold the scan request, resizing to 4 times", scanRequestBufferSize); - scanRequestBufferSize *= 4; - } - } - - logger.debug("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", scanRequestByteString.size(), rawScanByteString.size()); + final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(shardNum); + scanRequest.setTimeout(epResultItr.getRpcTimeout()); + scanRequestByteString = serializeGTScanReq(scanRequest); + + logger.info("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", scanRequestByteString.size(), rawScanByteString.size()); logger.info("The scan {} for segment {} is as below with {} separate raw scans, shard part of start/end key is set to 0", Integer.toHexString(System.identityHashCode(scanRequest)), cubeSeg, rawScans.size()); for (RawScan rs : rawScans) { @@ -172,7 +141,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { logger.debug("Submitting rpc to {} shards starting from shard {}, scan range count {}", shardNum, cuboidBaseShard, rawScans.size()); final AtomicLong totalScannedCount = new AtomicLong(0); - final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(shardNum); // KylinConfig: use env instance instead of CubeSegment, because KylinConfig will share among queries // for different cubes until redeployment of coprocessor jar. @@ -184,9 +152,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { builder.addHbaseColumnsToGT(intList); } builder.setRowkeyPreambleSize(cubeSeg.getRowKeyPreambleSize()); - builder.setBehavior(toggle); - builder.setStartTime(System.currentTimeMillis()); - builder.setTimeout(epResultItr.getRpcTimeout()); builder.setKylinProperties(kylinConfig.getConfigAsString()); for (final Pair<byte[], byte[]> epRange : getEPKeyRanges(cuboidBaseShard, shardNum, totalShards)) { @@ -260,6 +225,45 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { return new GTBlobScatter(fullGTInfo, epResultItr, scanRequest.getColumns(), totalScannedCount.get(), scanRequest.getStoragePushDownLimit()); } + private ByteString serializeGTScanReq(GTScanRequest scanRequest) { + ByteString scanRequestByteString; + int scanRequestBufferSize = BytesSerializer.SERIALIZE_BUFFER_SIZE; + while (true) { + try { + ByteBuffer buffer = ByteBuffer.allocate(scanRequestBufferSize); + GTScanRequest.serializer.serialize(scanRequest, buffer); + buffer.flip(); + scanRequestByteString = HBaseZeroCopyByteString.wrap(buffer.array(), buffer.position(), buffer.limit()); + break; + } catch (BufferOverflowException boe) { + logger.info("Buffer size {} cannot hold the scan request, resizing to 4 times", scanRequestBufferSize); + scanRequestBufferSize *= 4; + } + } + return scanRequestByteString; + } + + private ByteString serializeRawScans(List<RawScan> rawScans) { + ByteString rawScanByteString; + int rawScanBufferSize = BytesSerializer.SERIALIZE_BUFFER_SIZE; + while (true) { + try { + ByteBuffer rawScanBuffer = ByteBuffer.allocate(rawScanBufferSize); + BytesUtil.writeVInt(rawScans.size(), rawScanBuffer); + for (RawScan rs : rawScans) { + RawScan.serializer.serialize(rs, rawScanBuffer); + } + rawScanBuffer.flip(); + rawScanByteString = HBaseZeroCopyByteString.wrap(rawScanBuffer.array(), rawScanBuffer.position(), rawScanBuffer.limit()); + break; + } catch (BufferOverflowException boe) { + logger.info("Buffer size {} cannot hold the raw scans, resizing to 4 times", rawScanBufferSize); + rawScanBufferSize *= 4; + } + } + return rawScanByteString; + } + private String getStatsString(byte[] region, CubeVisitResponse result) { StringBuilder sb = new StringBuilder(); Stats stats = result.getStats(); http://git-wip-us.apache.org/repos/asf/kylin/blob/a2c875d8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java index 442963f..f4729a3 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java @@ -92,8 +92,8 @@ class ExpectedSizeIterator implements Iterator<byte[]> { if (coprocException instanceof GTScanSelfTerminatedException) throw (GTScanSelfTerminatedException) coprocException; else - throw new RuntimeException("Error in coprocessor",coprocException); - + throw new RuntimeException("Error in coprocessor", coprocException); + } else if (ret == null) { throw new RuntimeException("Timeout visiting cube! Check why coprocessor exception is not sent back? In coprocessor Self-termination is checked every " + // GTScanRequest.terminateCheckInterval + " scanned rows, the configured timeout(" + timeout + ") cannot support this many scans?"); http://git-wip-us.apache.org/repos/asf/kylin/blob/a2c875d8/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java index 064d100..36adca1 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java @@ -54,11 +54,11 @@ import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.GTScanTimeoutException; import org.apache.kylin.gridtable.IGTScanner; import org.apache.kylin.gridtable.IGTStore; +import org.apache.kylin.gridtable.StorageSideBehavior; import org.apache.kylin.measure.BufferedMeasureEncoder; import org.apache.kylin.metadata.filter.UDF.MassInTupleFilter; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.realization.IRealizationConstants; -import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior; import org.apache.kylin.storage.hbase.cube.v2.CellListIterator; import org.apache.kylin.storage.hbase.cube.v2.CubeHBaseRPC; import org.apache.kylin.storage.hbase.cube.v2.HBaseReadonlyStore; @@ -198,10 +198,10 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement for (IntList intList : request.getHbaseColumnsToGTList()) { hbaseColumnsToGT.add(intList.getIntsList()); } - CoprocessorBehavior behavior = CoprocessorBehavior.valueOf(request.getBehavior()); + StorageSideBehavior behavior = StorageSideBehavior.valueOf(scanReq.getStorageBehavior()); final List<RawScan> hbaseRawScans = deserializeRawScans(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getHbaseRawScan()))); - appendProfileInfo(sb, "start latency: " + (this.serviceStartTime - request.getStartTime())); + appendProfileInfo(sb, "start latency: " + (this.serviceStartTime - scanReq.getStartTime())); MassInTupleFilter.VALUE_PROVIDER_FACTORY = new MassInValueProviderFactoryImpl(new MassInValueProviderFactoryImpl.DimEncAware() { @Override @@ -228,7 +228,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement final Iterator<List<Cell>> allCellLists = Iterators.concat(cellListsForeachRawScan.iterator()); - if (behavior.ordinal() < CoprocessorBehavior.SCAN.ordinal()) { + if (behavior.ordinal() < StorageSideBehavior.SCAN.ordinal()) { //this is only for CoprocessorBehavior.RAW_SCAN case to profile hbase scan speed List<Cell> temp = Lists.newArrayList(); int counter = 0; @@ -240,12 +240,12 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement appendProfileInfo(sb, "scanned " + counter); } - if (behavior.ordinal() < CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) { + if (behavior.ordinal() < StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) { scanReq.disableAggCacheMemCheck(); // disable mem check if so told } final MutableBoolean scanNormalComplete = new MutableBoolean(true); - final long deadline = request.getTimeout() + this.serviceStartTime; + final long deadline = scanReq.getTimeout() + this.serviceStartTime; final long storagePushDownLimit = scanReq.getStoragePushDownLimit(); final CellListIterator cellListIterator = new CellListIterator() { @@ -285,12 +285,12 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement }; IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScans.get(0).hbaseColumns, hbaseColumnsToGT, // - request.getRowkeyPreambleSize(), CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY.toString().equals(request.getBehavior())); + request.getRowkeyPreambleSize(), StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY.toString().equals(scanReq.getStorageBehavior())); IGTScanner rawScanner = store.scan(scanReq); IGTScanner finalScanner = scanReq.decorateScanner(rawScanner, // - behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER.ordinal(), // - behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR.ordinal(), deadline); + behavior.ordinal() >= StorageSideBehavior.SCAN_FILTER.ordinal(), // + behavior.ordinal() >= StorageSideBehavior.SCAN_FILTER_AGGR.ordinal(), deadline); ByteBuffer buffer = ByteBuffer.allocate(BufferedMeasureEncoder.DEFAULT_BUFFER_SIZE);