This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 6096f0869f2293699d4ad8fe6499c1042416d65a Author: ggke <g...@autohome.com.cn> AuthorDate: Fri Oct 9 18:54:08 2020 +0800 KYLIN-4771 Clear the recordCachePool when the deadline has reached;add timeout for the recordCachePool offer method. --- .../kylin/stream/core/query/MultiThreadsResultCollector.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java b/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java index 0ca08e4..f0c91fd 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java @@ -89,6 +89,7 @@ public class MultiThreadsResultCollector extends ResultCollector { if (System.currentTimeMillis() > deadline) { masterThread.interrupt(); // notify main thread cancelFlag.set(true); + recordCachePool.clear(); logger.warn("Beyond the deadline for {}.", queryId); throw new RuntimeException("Timeout when iterate search result"); } @@ -112,6 +113,7 @@ public class MultiThreadsResultCollector extends ResultCollector { if (one == null) { masterThread.interrupt(); // notify main thread cancelFlag.set(true); + recordCachePool.clear(); logger.debug("Exceeded the deadline for {}.", queryId); throw new RuntimeException("Timeout when iterate search result"); } @@ -141,10 +143,15 @@ public class MultiThreadsResultCollector extends ResultCollector { @Override public void run() { + long offserTimeout = 0L; try { result.startRead(); for (Record record : result) { - recordCachePool.put(record.copy()); + offserTimeout = deadline - System.currentTimeMillis(); + if (!recordCachePool.offer(record, offserTimeout, TimeUnit.MILLISECONDS)) { + logger.warn("Timeout when offer to recordCachePool, deadline: {}, offser Timeout: {}", deadline, offserTimeout); + break; + } } result.endRead(); } catch (InterruptedException inter) {