This is an automated email from the ASF dual-hosted git repository. morrysnow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 79a6dd1ff49 [fix](catalog)when checkpoint,use cacheThreadPool (#49097) 79a6dd1ff49 is described below commit 79a6dd1ff494e3d000a65f4077449d05eb3953f6 Author: zhangdong <zhangd...@selectdb.com> AuthorDate: Wed Mar 26 16:05:58 2025 +0800 [fix](catalog)when checkpoint,use cacheThreadPool (#49097) when checkpoint,if use fixedThreadPool,will Thread Overflow The checkpoint thread is a newly created thread pool (without reusing the business thread pool), with a core size and maxSize of 64. During the checkpoint process, if the guava cache reaches its expiration time (10 minutes), it will automatically create a task to clean up the cache. Since four caches are using this thread pool and have expiration times of 10 minutes, four threads are created simultaneously to clean up different caches. Since 4<coteSize (64), threads will not be automatically destroyed. --- .../main/java/org/apache/doris/catalog/Env.java | 2 +- .../doris/datasource/ExternalMetaCacheMgr.java | 25 +++++++++++++++++----- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 404590a786d..91c858fdbb9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -817,7 +817,7 @@ public class Env { this.auditEventProcessor = new AuditEventProcessor(this.pluginMgr); this.refreshManager = new RefreshManager(); this.policyMgr = new PolicyMgr(); - this.extMetaCacheMgr = new ExternalMetaCacheMgr(); + this.extMetaCacheMgr = new ExternalMetaCacheMgr(isCheckpointCatalog); this.analysisManager = new AnalysisManager(); this.statisticsCleaner = new StatisticsCleaner(); this.statisticsAutoCollector = new StatisticsAutoCollector(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java index 9ad78816278..c2f50f929f8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java @@ -99,25 +99,25 @@ public class ExternalMetaCacheMgr { private final MaxComputeMetadataCacheMgr maxComputeMetadataCacheMgr; private final PaimonMetadataCacheMgr paimonMetadataCacheMgr; - public ExternalMetaCacheMgr() { - rowCountRefreshExecutor = ThreadPoolManager.newDaemonFixedThreadPool( + public ExternalMetaCacheMgr(boolean isCheckpointCatalog) { + rowCountRefreshExecutor = newThreadPool(isCheckpointCatalog, Config.max_external_cache_loader_thread_pool_size, Config.max_external_cache_loader_thread_pool_size * 1000, "RowCountRefreshExecutor", 0, true); - commonRefreshExecutor = ThreadPoolManager.newDaemonFixedThreadPool( + commonRefreshExecutor = newThreadPool(isCheckpointCatalog, Config.max_external_cache_loader_thread_pool_size, Config.max_external_cache_loader_thread_pool_size * 10000, "CommonRefreshExecutor", 10, true); // The queue size should be large enough, // because there may be thousands of partitions being queried at the same time. - fileListingExecutor = ThreadPoolManager.newDaemonFixedThreadPool( + fileListingExecutor = newThreadPool(isCheckpointCatalog, Config.max_external_cache_loader_thread_pool_size, Config.max_external_cache_loader_thread_pool_size * 1000, "FileListingExecutor", 10, true); - scheduleExecutor = ThreadPoolManager.newDaemonFixedThreadPool( + scheduleExecutor = newThreadPool(isCheckpointCatalog, Config.max_external_cache_loader_thread_pool_size, Config.max_external_cache_loader_thread_pool_size * 1000, "scheduleExecutor", 10, true); @@ -131,6 +131,21 @@ public class ExternalMetaCacheMgr { paimonMetadataCacheMgr = new PaimonMetadataCacheMgr(commonRefreshExecutor); } + private ExecutorService newThreadPool(boolean isCheckpointCatalog, int numThread, int queueSize, + String poolName, int timeoutSeconds, + boolean needRegisterMetric) { + String executorNamePrefix = isCheckpointCatalog ? "Checkpoint" : "NotCheckpoint"; + String realPoolName = executorNamePrefix + poolName; + // Business threads require a fixed size thread pool and use queues to store unprocessed tasks. + // Checkpoint threads have almost no business and need to be released in a timely manner to avoid thread leakage + if (isCheckpointCatalog) { + return ThreadPoolManager.newDaemonCacheThreadPool(numThread, realPoolName, needRegisterMetric); + } else { + return ThreadPoolManager.newDaemonFixedThreadPool(numThread, queueSize, realPoolName, timeoutSeconds, + needRegisterMetric); + } + } + public ExecutorService getFileListingExecutor() { return fileListingExecutor; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org