KYLIN-1645 Report coproc exception back to the query thread
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/a80a0f73 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/a80a0f73 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/a80a0f73 Branch: refs/heads/stream_m1 Commit: a80a0f7346247af607bb17f31845b1ecb0697330 Parents: 5772718 Author: Li Yang <liy...@apache.org> Authored: Wed Jun 22 11:26:26 2016 +0800 Committer: Li Yang <liy...@apache.org> Committed: Wed Jun 22 11:26:26 2016 +0800 ---------------------------------------------------------------------- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 29 ++++++++++++++------ 1 file changed, 20 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/a80a0f73/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index 9cc1bee..bacf6e2 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -86,6 +86,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { int current = 0; long timeout; long timeoutTS; + volatile Throwable coprocException; public ExpectedSizeIterator(int expectedSize) { this.expectedSize = expectedSize; @@ -117,19 +118,21 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { } try { current++; - long tsRemaining = this.timeoutTS - System.currentTimeMillis(); - if (tsRemaining < 0) { - throw new RuntimeException("Timeout visiting cube!"); + byte[] ret = null; + + while (ret == null && coprocException == null && timeoutTS - System.currentTimeMillis() > 0) { + ret = queue.poll(5000, TimeUnit.MILLISECONDS); } - byte[] ret = queue.poll(tsRemaining, TimeUnit.MILLISECONDS); - if (ret == null) { + if (coprocException != null) { + throw new RuntimeException("Error in coprocessor", coprocException); + } else if (ret == null) { throw new RuntimeException("Timeout visiting cube!"); } else { return ret; } } catch (InterruptedException e) { - throw new RuntimeException("error when waiting queue", e); + throw new RuntimeException("Error when waiting queue", e); } } @@ -149,6 +152,10 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { public long getTimeout() { return timeout; } + + public void notifyCoprocException(Throwable ex) { + coprocException = ex; + } } static class EndpointResultsAsGTScanner implements IGTScanner { @@ -389,12 +396,16 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { } }); - } catch (Throwable throwable) { - throw new RuntimeException(logHeader + "Error when visiting cubes by endpoint", throwable); + } catch (Throwable ex) { + logger.error(logHeader + "Error when visiting cubes by endpoint", ex); + epResultItr.notifyCoprocException(ex); + return; } if (abnormalFinish[0]) { - throw new RuntimeException(logHeader + "The coprocessor thread stopped itself due to scan timeout, failing current query..."); + Throwable ex = new RuntimeException(logHeader + "The coprocessor thread stopped itself due to scan timeout, failing current query..."); + epResultItr.notifyCoprocException(ex); + return; } } });