This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 79a6dd1ff49 [fix](catalog)when checkpoint,use cacheThreadPool (#49097)
79a6dd1ff49 is described below

commit 79a6dd1ff494e3d000a65f4077449d05eb3953f6
Author: zhangdong <zhangd...@selectdb.com>
AuthorDate: Wed Mar 26 16:05:58 2025 +0800

    [fix](catalog)when checkpoint,use cacheThreadPool (#49097)
    
    when checkpoint,if use fixedThreadPool,will Thread Overflow
    
    The checkpoint thread is a newly created thread pool (without reusing
    the business thread pool), with a core size and maxSize of 64. During
    the checkpoint process, if the guava cache reaches its expiration time
    (10 minutes), it will automatically create a task to clean up the cache.
    Since four caches are using this thread pool and have expiration times
    of 10 minutes, four threads are created simultaneously to clean up
    different caches. Since 4<coteSize (64), threads will not be
    automatically destroyed.
---
 .../main/java/org/apache/doris/catalog/Env.java    |  2 +-
 .../doris/datasource/ExternalMetaCacheMgr.java     | 25 +++++++++++++++++-----
 2 files changed, 21 insertions(+), 6 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 404590a786d..91c858fdbb9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -817,7 +817,7 @@ public class Env {
         this.auditEventProcessor = new AuditEventProcessor(this.pluginMgr);
         this.refreshManager = new RefreshManager();
         this.policyMgr = new PolicyMgr();
-        this.extMetaCacheMgr = new ExternalMetaCacheMgr();
+        this.extMetaCacheMgr = new ExternalMetaCacheMgr(isCheckpointCatalog);
         this.analysisManager = new AnalysisManager();
         this.statisticsCleaner = new StatisticsCleaner();
         this.statisticsAutoCollector = new StatisticsAutoCollector();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
index 9ad78816278..c2f50f929f8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
@@ -99,25 +99,25 @@ public class ExternalMetaCacheMgr {
     private final MaxComputeMetadataCacheMgr maxComputeMetadataCacheMgr;
     private final PaimonMetadataCacheMgr paimonMetadataCacheMgr;
 
-    public ExternalMetaCacheMgr() {
-        rowCountRefreshExecutor = ThreadPoolManager.newDaemonFixedThreadPool(
+    public ExternalMetaCacheMgr(boolean isCheckpointCatalog) {
+        rowCountRefreshExecutor = newThreadPool(isCheckpointCatalog,
                 Config.max_external_cache_loader_thread_pool_size,
                 Config.max_external_cache_loader_thread_pool_size * 1000,
                 "RowCountRefreshExecutor", 0, true);
 
-        commonRefreshExecutor = ThreadPoolManager.newDaemonFixedThreadPool(
+        commonRefreshExecutor = newThreadPool(isCheckpointCatalog,
                 Config.max_external_cache_loader_thread_pool_size,
                 Config.max_external_cache_loader_thread_pool_size * 10000,
                 "CommonRefreshExecutor", 10, true);
 
         // The queue size should be large enough,
         // because there may be thousands of partitions being queried at the 
same time.
-        fileListingExecutor = ThreadPoolManager.newDaemonFixedThreadPool(
+        fileListingExecutor = newThreadPool(isCheckpointCatalog,
                 Config.max_external_cache_loader_thread_pool_size,
                 Config.max_external_cache_loader_thread_pool_size * 1000,
                 "FileListingExecutor", 10, true);
 
-        scheduleExecutor = ThreadPoolManager.newDaemonFixedThreadPool(
+        scheduleExecutor = newThreadPool(isCheckpointCatalog,
                 Config.max_external_cache_loader_thread_pool_size,
                 Config.max_external_cache_loader_thread_pool_size * 1000,
                 "scheduleExecutor", 10, true);
@@ -131,6 +131,21 @@ public class ExternalMetaCacheMgr {
         paimonMetadataCacheMgr = new 
PaimonMetadataCacheMgr(commonRefreshExecutor);
     }
 
+    private ExecutorService newThreadPool(boolean isCheckpointCatalog, int 
numThread, int queueSize,
+            String poolName, int timeoutSeconds,
+            boolean needRegisterMetric) {
+        String executorNamePrefix = isCheckpointCatalog ? "Checkpoint" : 
"NotCheckpoint";
+        String realPoolName = executorNamePrefix + poolName;
+        // Business threads require a fixed size thread pool and use queues to 
store unprocessed tasks.
+        // Checkpoint threads have almost no business and need to be released 
in a timely manner to avoid thread leakage
+        if (isCheckpointCatalog) {
+            return ThreadPoolManager.newDaemonCacheThreadPool(numThread, 
realPoolName, needRegisterMetric);
+        } else {
+            return ThreadPoolManager.newDaemonFixedThreadPool(numThread, 
queueSize, realPoolName, timeoutSeconds,
+                    needRegisterMetric);
+        }
+    }
+
     public ExecutorService getFileListingExecutor() {
         return fileListingExecutor;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to