This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 6203bf8  Minor, unify thread capacity for scannerThreadPool
6203bf8 is described below

commit 6203bf8389bb4f8d6a95c07b3b9a32c28e8cc9df
Author: XiaoxiangYu <x...@apache.org>
AuthorDate: Mon Jan 18 18:16:19 2021 +0800

    Minor, unify thread capacity for scannerThreadPool
---
 .../src/main/java/org/apache/kylin/common/KylinConfigBase.java       | 2 +-
 .../apache/kylin/stream/core/query/MultiThreadsResultCollector.java  | 5 +++--
 2 files changed, 4 insertions(+), 3 deletions(-)

diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 2e16f47..c3739b0 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2561,7 +2561,7 @@ public abstract class KylinConfigBase implements 
Serializable {
     }
 
     public int getStreamingReceiverQueryCoreThreads() {
-        int def = getStreamingReceiverQueryMaxThreads() - 1;
+        int def = Math.max(2, AVAILABLE_PROCESSORS - 1);
         return 
Integer.parseInt(getOptional("kylin.stream.receiver.query-core-threads", def + 
""));
     }
 
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java
index b914e79..76a83c7 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java
@@ -45,7 +45,8 @@ public class MultiThreadsResultCollector extends 
ResultCollector {
     static {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
         MAX_RUNNING_THREAD_COUNT = 
config.getStreamingReceiverQueryMaxThreads();
-        scannerThreadPool = new 
ThreadPoolExecutor(config.getStreamingReceiverQueryCoreThreads(),
+        int coreThreads = config.getStreamingReceiverQueryCoreThreads();
+        scannerThreadPool = new ThreadPoolExecutor(coreThreads,
                 MAX_RUNNING_THREAD_COUNT, 60L, TimeUnit.SECONDS,
                 new LinkedBlockingQueue<>(), new 
NamedThreadFactory("query-worker"));
     }
@@ -204,7 +205,7 @@ public class MultiThreadsResultCollector extends 
ResultCollector {
     public static boolean isFullUp() {
         boolean occupied = scannerThreadPool.getActiveCount() >= 
MAX_RUNNING_THREAD_COUNT;
         if (occupied) {
-            logger.debug("ThreadPool {}", scannerThreadPool);
+            logger.debug("ThreadPool {} is full .", scannerThreadPool);
         }
         return occupied;
     }

Reply via email to