KYLIN-2079 add explicit configuration knob for coprocessor timeout
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/6bdd4f38 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/6bdd4f38 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/6bdd4f38 Branch: refs/heads/v1.6.0-rc1-cdh5.7 Commit: 6bdd4f38335b3f4f37fd1e263f667d3daf5bfd73 Parents: 11c2c69 Author: gaodayue <gaoda...@meituan.com> Authored: Tue Oct 11 15:11:38 2016 +0800 Committer: gaodayue <gaoda...@meituan.com> Committed: Sun Oct 30 22:32:45 2016 +0800 ---------------------------------------------------------------------- build/conf/kylin.properties | 4 ++ .../apache/kylin/common/KylinConfigBase.java | 8 +-- .../kylin/gridtable/StorageSideBehavior.java | 2 +- .../apache/kylin/query/ITKylinQueryTest.java | 4 +- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 6 +- .../storage/hbase/cube/v2/CubeHBaseRPC.java | 24 +++++++ .../hbase/cube/v2/ExpectedSizeIterator.java | 73 ++++++-------------- 7 files changed, 60 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/6bdd4f38/build/conf/kylin.properties ---------------------------------------------------------------------- diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties index 33a4e76..24e8f50 100644 --- a/build/conf/kylin.properties +++ b/build/conf/kylin.properties @@ -137,6 +137,10 @@ kylin.query.mem.budget=3221225472 kylin.query.coprocessor.mem.gb=3 +# the default coprocessor timeout is (hbase.rpc.timeout * 0.9) / 1000 seconds, +# you can set it to a smaller value. 0 means use default. +# kylin.query.coprocessor.timeout.seconds=0 + # Enable/disable ACL check for cube query kylin.query.security.enabled=true http://git-wip-us.apache.org/repos/asf/kylin/blob/6bdd4f38/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index f49127b..26c280b 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -485,10 +485,6 @@ abstract public class KylinConfigBase implements Serializable { return Integer.parseInt(getOptional("kylin.query.scan.threshold", "10000000")); } - public float getCubeVisitTimeoutTimes() { - return Float.parseFloat(getOptional("kylin.query.cube.visit.timeout.times", "1")); - } - public int getBadQueryStackTraceDepth() { return Integer.parseInt(getOptional("kylin.query.badquery.stacktrace.depth", "10")); } @@ -541,6 +537,10 @@ abstract public class KylinConfigBase implements Serializable { return Double.parseDouble(this.getOptional("kylin.query.coprocessor.mem.gb", "3.0")); } + public int getQueryCoprocessorTimeoutSeconds() { + return Integer.parseInt(this.getOptional("kylin.query.coprocessor.timeout.seconds", "0")); + } + public boolean isQuerySecureEnabled() { return Boolean.parseBoolean(this.getOptional("kylin.query.security.enabled", "true")); } http://git-wip-us.apache.org/repos/asf/kylin/blob/6bdd4f38/core-cube/src/main/java/org/apache/kylin/gridtable/StorageSideBehavior.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/StorageSideBehavior.java b/core-cube/src/main/java/org/apache/kylin/gridtable/StorageSideBehavior.java index b01ac3f..d87b41b 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/StorageSideBehavior.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/StorageSideBehavior.java @@ -26,7 +26,7 @@ public enum StorageSideBehavior { SCAN_FILTER, //only scan+filter used,used for profiling filter speed. Will not return any result SCAN_FILTER_AGGR, //aggregate the result. Will return results SCAN_FILTER_AGGR_CHECKMEM, //default full operations. Will return results - SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY; // on each scan operation, delay for 10s to simulate slow queries, for test use + SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY; // on each scan operation, delay for 10ms to simulate slow queries, for test use public boolean filterToggledOn() { return this.ordinal() >= SCAN_FILTER.ordinal(); http://git-wip-us.apache.org/repos/asf/kylin/blob/6bdd4f38/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java index 2ec5324..61926d8 100644 --- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java @@ -102,7 +102,7 @@ public class ITKylinQueryTest extends KylinTestBase { toggles.put(BackdoorToggles.DEBUG_TOGGLE_COPROCESSOR_BEHAVIOR, StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY.toString());//delay 10ms for every scan BackdoorToggles.setToggles(toggles); - KylinConfig.getInstanceFromEnv().setProperty("kylin.query.cube.visit.timeout.times", "0.01");//set timeout to 3s + KylinConfig.getInstanceFromEnv().setProperty("kylin.query.coprocessor.timeout.seconds", "3"); //these two cubes has RAW measure, will disturb limit push down RemoveBlackoutRealizationsRule.blackList.add("CUBE[name=test_kylin_cube_without_slr_left_join_empty]"); @@ -115,7 +115,7 @@ public class ITKylinQueryTest extends KylinTestBase { RemoveBlackoutRealizationsRule.blackList.remove("CUBE[name=test_kylin_cube_without_slr_left_join_empty]"); RemoveBlackoutRealizationsRule.blackList.remove("CUBE[name=test_kylin_cube_without_slr_inner_join_empty]"); - KylinConfig.getInstanceFromEnv().setProperty("kylin.query.cube.visit.timeout.times", "1");//set timeout to 9s + KylinConfig.getInstanceFromEnv().setProperty("kylin.query.coprocessor.timeout.seconds", "0"); // set timeout to default BackdoorToggles.cleanToggles(); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/6bdd4f38/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index 4f538ae..d99f80e 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -128,10 +128,12 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { List<RawScan> rawScans = preparedHBaseScans(scanRequest.getGTScanRanges(), selectedColBlocks); rawScanByteString = serializeRawScans(rawScans); + int coprocessorTimeout = getCoprocessorTimeoutMillis(); + scanRequest.setTimeout(coprocessorTimeout); scanRequest.clearScanRanges();//since raw scans are sent to coprocessor, we don't need to duplicate sending it - final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(shardNum); - scanRequest.setTimeout(epResultItr.getRpcTimeout()); scanRequestByteString = serializeGTScanReq(scanRequest); + + final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(shardNum, coprocessorTimeout); logger.info("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", scanRequestByteString.size(), rawScanByteString.size()); http://git-wip-us.apache.org/repos/asf/kylin/blob/6bdd4f38/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java index da087c9..05b34c7 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java @@ -22,11 +22,14 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.FuzzyRowFilter; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.Pair; @@ -45,6 +48,7 @@ import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.gridtable.GTScanRange; import org.apache.kylin.gridtable.IGTStorage; +import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -274,4 +278,24 @@ public abstract class CubeHBaseRPC implements IGTStorage { logger.info(info.toString()); } + protected int getCoprocessorTimeoutMillis() { + int configTimeout = cubeSeg.getConfig().getQueryCoprocessorTimeoutSeconds() * 1000; + if (configTimeout == 0) { + configTimeout = Integer.MAX_VALUE; + } + + Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration(); + int rpcTimeout = hconf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + // final timeout should be smaller than rpc timeout + int upper = (int) (rpcTimeout * 0.9); + + int timeout = Math.min(upper, configTimeout); + if (BackdoorToggles.getQueryTimeout() != -1) { + timeout = Math.min(upper, BackdoorToggles.getQueryTimeout()); + } + + logger.debug("{} = {} ms, use {} ms as timeout for coprocessor", HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout, timeout); + return timeout; + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/6bdd4f38/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java index c27e5fc..2d574bd 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java @@ -24,50 +24,25 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.commons.lang.NotImplementedException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.gridtable.GTScanRequest; -import org.apache.kylin.gridtable.GTScanSelfTerminatedException; -import org.apache.kylin.storage.hbase.HBaseConnection; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -class ExpectedSizeIterator implements Iterator<byte[]> { - private static final Logger logger = LoggerFactory.getLogger(ExpectedSizeIterator.class); - - BlockingQueue<byte[]> queue; +import com.google.common.base.Throwables; - int expectedSize; - int current = 0; - long rpcTimeout; - long timeout; - long timeoutTS; - volatile Throwable coprocException; - - public ExpectedSizeIterator(int expectedSize) { +class ExpectedSizeIterator implements Iterator<byte[]> { + private BlockingQueue<byte[]> queue; + private int expectedSize; + private int current = 0; + private int coprocessorTimeout; + private long deadline; + private volatile Throwable coprocException; + + public ExpectedSizeIterator(int expectedSize, int coprocessorTimeout) { this.expectedSize = expectedSize; this.queue = new ArrayBlockingQueue<byte[]>(expectedSize); - StringBuilder sb = new StringBuilder(); - Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration(); - - this.rpcTimeout = hconf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); - this.timeout = this.rpcTimeout * hconf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); - sb.append("rpc timeout is " + this.rpcTimeout + " and after multiply retry times becomes " + this.timeout); - - this.timeout *= KylinConfig.getInstanceFromEnv().getCubeVisitTimeoutTimes(); - sb.append(" after multiply kylin.query.cube.visit.timeout.times becomes " + this.timeout); - - logger.info(sb.toString()); - - if (BackdoorToggles.getQueryTimeout() != -1) { - this.timeout = BackdoorToggles.getQueryTimeout(); - logger.info("rpc timeout is overwritten to " + this.timeout); - } - - this.timeoutTS = System.currentTimeMillis() + 2 * this.timeout;//longer timeout than coprocessor so that query thread will not timeout faster than coprocessor + this.coprocessorTimeout = coprocessorTimeout; + //longer timeout than coprocessor so that query thread will not timeout faster than coprocessor + this.deadline = System.currentTimeMillis() + coprocessorTimeout * 10; } @Override @@ -84,22 +59,20 @@ class ExpectedSizeIterator implements Iterator<byte[]> { current++; byte[] ret = null; - while (ret == null && coprocException == null && timeoutTS > System.currentTimeMillis()) { - ret = queue.poll(10000, TimeUnit.MILLISECONDS); + while (ret == null && coprocException == null && deadline > System.currentTimeMillis()) { + ret = queue.poll(1000, TimeUnit.MILLISECONDS); } if (coprocException != null) { - if (coprocException instanceof GTScanSelfTerminatedException) - throw (GTScanSelfTerminatedException) coprocException; - else - throw new RuntimeException("Error in coprocessor", coprocException); + throw Throwables.propagate(coprocException); + } - } else if (ret == null) { + if (ret == null) { throw new RuntimeException("Timeout visiting cube! Check why coprocessor exception is not sent back? In coprocessor Self-termination is checked every " + // - GTScanRequest.terminateCheckInterval + " scanned rows, the configured timeout(" + timeout + ") cannot support this many scans?"); - } else { - return ret; + GTScanRequest.terminateCheckInterval + " scanned rows, the configured timeout(" + coprocessorTimeout + ") cannot support this many scans?"); } + + return ret; } catch (InterruptedException e) { throw new RuntimeException("Error when waiting queue", e); } @@ -118,10 +91,6 @@ class ExpectedSizeIterator implements Iterator<byte[]> { } } - public long getRpcTimeout() { - return this.timeout; - } - public void notifyCoprocException(Throwable ex) { coprocException = ex; }