This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new 6203bf8 Minor, unify thread capacity for scannerThreadPool 6203bf8 is described below commit 6203bf8389bb4f8d6a95c07b3b9a32c28e8cc9df Author: XiaoxiangYu <x...@apache.org> AuthorDate: Mon Jan 18 18:16:19 2021 +0800 Minor, unify thread capacity for scannerThreadPool --- .../src/main/java/org/apache/kylin/common/KylinConfigBase.java | 2 +- .../apache/kylin/stream/core/query/MultiThreadsResultCollector.java | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 2e16f47..c3739b0 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -2561,7 +2561,7 @@ public abstract class KylinConfigBase implements Serializable { } public int getStreamingReceiverQueryCoreThreads() { - int def = getStreamingReceiverQueryMaxThreads() - 1; + int def = Math.max(2, AVAILABLE_PROCESSORS - 1); return Integer.parseInt(getOptional("kylin.stream.receiver.query-core-threads", def + "")); } diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java b/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java index b914e79..76a83c7 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java @@ -45,7 +45,8 @@ public class MultiThreadsResultCollector extends ResultCollector { static { KylinConfig config = KylinConfig.getInstanceFromEnv(); MAX_RUNNING_THREAD_COUNT = config.getStreamingReceiverQueryMaxThreads(); - scannerThreadPool = new ThreadPoolExecutor(config.getStreamingReceiverQueryCoreThreads(), + int coreThreads = config.getStreamingReceiverQueryCoreThreads(); + scannerThreadPool = new ThreadPoolExecutor(coreThreads, MAX_RUNNING_THREAD_COUNT, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory("query-worker")); } @@ -204,7 +205,7 @@ public class MultiThreadsResultCollector extends ResultCollector { public static boolean isFullUp() { boolean occupied = scannerThreadPool.getActiveCount() >= MAX_RUNNING_THREAD_COUNT; if (occupied) { - logger.debug("ThreadPool {}", scannerThreadPool); + logger.debug("ThreadPool {} is full .", scannerThreadPool); } return occupied; }