Repository: kylin Updated Branches: refs/heads/yang-m1 982a7b785 -> 93b756dd8
enable query level timeout backdoor toggle and correct ExpectedSizeIterator wait timeout Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/93b756dd Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/93b756dd Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/93b756dd Branch: refs/heads/yang-m1 Commit: 93b756dd88b9a3a4362c82f5e969dc1e9bfb1528 Parents: 982a7b7 Author: Hongbin Ma <mahong...@apache.org> Authored: Mon Apr 25 13:35:56 2016 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Mon Apr 25 13:39:19 2016 +0800 ---------------------------------------------------------------------- .../kylin/common/debug/BackdoorToggles.java | 18 +++++++++++++++ .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 23 ++++++++++++++++---- 2 files changed, 37 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/93b756dd/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java b/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java index f3745d7..0feff2d 100644 --- a/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java +++ b/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java @@ -54,6 +54,14 @@ public class BackdoorToggles { return getBoolean(DEBUG_TOGGLE_LOCAL_COPROCESSOR); } + public static int getQueryTimeout() { + String v = getString(DEBUG_TOGGLE_QUERY_TIMEOUT); + if (v == null) + return -1; + else + return Integer.valueOf(v); + } + private static String getString(String key) { Map<String, String> toggles = _backdoorToggles.get(); if (toggles == null) { @@ -135,4 +143,14 @@ public class BackdoorToggles { } */ public final static String DEBUG_TOGGLE_LOCAL_COPROCESSOR = "DEBUG_TOGGLE_LOCAL_COPROCESSOR"; + + /** + * set DEBUG_TOGGLE_QUERY_TIMEOUT="timeout_millis" to overwrite the global timeout settings + * + example:(put it into request body) + "backdoorToggles": { + "DEBUG_TOGGLE_QUERY_TIMEOUT": "120000" + } + */ + public final static String DEBUG_TOGGLE_QUERY_TIMEOUT = "DEBUG_TOGGLE_QUERY_TIMEOUT"; } http://git-wip-us.apache.org/repos/asf/kylin/blob/93b756dd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index 053d99e..99a72bb 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -112,19 +112,29 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { static class ExpectedSizeIterator implements Iterator<byte[]> { + BlockingQueue<byte[]> queue; + int expectedSize; int current = 0; - BlockingQueue<byte[]> queue; long timeout; + long timeoutTS; public ExpectedSizeIterator(int expectedSize) { this.expectedSize = expectedSize; this.queue = new ArrayBlockingQueue<byte[]>(expectedSize); + this.timeout = HadoopUtil.getCurrentConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); this.timeout *= KylinConfig.getInstanceFromEnv().getCubeVisitTimeoutTimes(); - + + if (BackdoorToggles.getQueryTimeout() != -1) { + this.timeout = BackdoorToggles.getQueryTimeout(); + } + this.timeout *= 1.1;//allow for some delay - logger.info("Timeout for ExpectedSizeIterator is " + this.timeout); + + logger.info("Timeout for ExpectedSizeIterator is: " + this.timeout); + + this.timeoutTS = System.currentTimeMillis() + this.timeout; } @Override @@ -139,7 +149,12 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { } try { current++; - byte[] ret = queue.poll(timeout, TimeUnit.MILLISECONDS); + long tsRemaining = this.timeoutTS - System.currentTimeMillis(); + if (tsRemaining < 0) { + throw new RuntimeException("Timeout visiting cube!"); + } + + byte[] ret = queue.poll(tsRemaining, TimeUnit.MILLISECONDS); if (ret == null) { throw new RuntimeException("Timeout visiting cube!"); } else {