This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 03994b8d939 branch-2.1: [fix](iceberg)Fix the thread pool issue used 
for commit. #51508 (#51528)
03994b8d939 is described below

commit 03994b8d9397100f4526b4fc6b03eac828ba5699
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Jul 11 06:31:40 2025 +0800

    branch-2.1: [fix](iceberg)Fix the thread pool issue used for commit. #51508 
(#51528)
    
    Cherry-picked from #51508
    
    ---------
    
    Co-authored-by: wuwenchi <[email protected]>
    Co-authored-by: Mingyu Chen (Rayner) <[email protected]>
---
 .../org/apache/doris/common/ThreadPoolManager.java | 68 ++++++++++++++++++++++
 .../apache/doris/datasource/ExternalCatalog.java   | 12 ++++
 .../doris/datasource/hive/HMSExternalCatalog.java  |  6 ++
 .../datasource/iceberg/IcebergExternalCatalog.java |  7 +++
 .../datasource/iceberg/IcebergMetadataOps.java     |  5 ++
 .../datasource/iceberg/IcebergTransaction.java     |  4 +-
 6 files changed, 100 insertions(+), 2 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
index 3be5af8ac54..0061e3702a1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
@@ -18,6 +18,7 @@
 package org.apache.doris.common;
 
 
+import 
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
 import org.apache.doris.metric.Metric;
 import org.apache.doris.metric.Metric.MetricUnit;
 import org.apache.doris.metric.MetricLabel;
@@ -33,6 +34,7 @@ import java.util.Comparator;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.PriorityBlockingQueue;
@@ -68,6 +70,7 @@ import java.util.function.Supplier;
  */
 
 public class ThreadPoolManager {
+    private static final Logger LOG = 
LogManager.getLogger(ThreadPoolManager.class);
 
     private static Map<String, ThreadPoolExecutor> nameToThreadPoolMap = 
Maps.newConcurrentMap();
 
@@ -126,6 +129,50 @@ public class ThreadPoolManager {
             new LogDiscardPolicyThrowException(poolName), poolName, 
needRegisterMetric);
     }
 
+    public static ThreadPoolExecutor newDaemonFixedThreadPoolWithPreAuth(
+            int numThread,
+            int queueSize,
+            String poolName,
+            boolean needRegisterMetric,
+            PreExecutionAuthenticator preAuth) {
+        return newDaemonThreadPoolWithPreAuth(numThread, numThread, 
KEEP_ALIVE_TIME, TimeUnit.SECONDS,
+                new LinkedBlockingQueue<>(queueSize), new 
BlockedPolicy(poolName, 60),
+                poolName, needRegisterMetric, preAuth);
+    }
+
+    public static ThreadPoolExecutor newDaemonThreadPoolWithPreAuth(
+            int corePoolSize,
+            int maximumPoolSize,
+            long keepAliveTime,
+            TimeUnit unit,
+            BlockingQueue<Runnable> workQueue,
+            RejectedExecutionHandler handler,
+            String poolName,
+            boolean needRegisterMetric,
+            PreExecutionAuthenticator preAuth) {
+        ThreadFactory threadFactory = namedThreadFactoryWithPreAuth(poolName, 
preAuth);
+        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(corePoolSize, 
maximumPoolSize,
+                keepAliveTime, unit, workQueue, threadFactory, handler);
+        if (needRegisterMetric) {
+            nameToThreadPoolMap.put(poolName, threadPool);
+        }
+        return threadPool;
+    }
+
+    private static ThreadFactory namedThreadFactoryWithPreAuth(String 
poolName, PreExecutionAuthenticator preAuth) {
+        return new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat(poolName + "-%d")
+                .setThreadFactory(runnable -> new Thread(() -> {
+                    try {
+                        preAuth.execute(runnable);
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                }))
+                .build();
+    }
+
     public static ThreadPoolExecutor newDaemonFixedThreadPool(int numThread,
             int queueSize, String poolName, boolean needRegisterMetric) {
         return newDaemonThreadPool(numThread, numThread, KEEP_ALIVE_TIME, 
TimeUnit.SECONDS,
@@ -377,4 +424,25 @@ public class ThreadPoolManager {
             }
         }
     }
+
+    public static void shutdownExecutorService(ExecutorService 
executorService) {
+        // Disable new tasks from being submitted
+        executorService.shutdown();
+        try {
+            // Wait a while for existing tasks to terminate
+            if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
+                // Cancel currently executing tasks
+                executorService.shutdownNow();
+                // Wait a while for tasks to respond to being cancelled
+                if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
+                    LOG.warn("ExecutorService did not terminate");
+                }
+            }
+        } catch (InterruptedException e) {
+            // (Re-)Cancel if current thread also interrupted
+            executorService.shutdownNow();
+            // Preserve interrupt status
+            Thread.currentThread().interrupt();
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index bd3d966ac37..6052be35f57 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -35,6 +35,7 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.Pair;
+import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.Version;
 import org.apache.doris.common.io.Text;
@@ -91,6 +92,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.Set;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.stream.Collectors;
 
 /**
@@ -117,6 +119,8 @@ public abstract class ExternalCatalog
             CREATE_TIME,
             USE_META_CACHE);
 
+    protected static final int ICEBERG_CATALOG_EXECUTOR_THREAD_NUM = 
Runtime.getRuntime().availableProcessors();
+
     // Unique id of this catalog, will be assigned after catalog is loaded.
     @SerializedName(value = "id")
     protected long id;
@@ -157,6 +161,7 @@ public abstract class ExternalCatalog
     protected Optional<Boolean> useMetaCache = Optional.empty();
     protected MetaCache<ExternalDatabase<? extends ExternalTable>> metaCache;
     protected PreExecutionAuthenticator preExecutionAuthenticator;
+    protected ThreadPoolExecutor threadPoolWithPreAuth;
 
     private volatile Configuration cachedConf = null;
     private byte[] confLock = new byte[0];
@@ -759,6 +764,9 @@ public abstract class ExternalCatalog
         if (null != transactionManager) {
             transactionManager = null;
         }
+        if (threadPoolWithPreAuth != null) {
+            ThreadPoolManager.shutdownExecutorService(threadPoolWithPreAuth);
+        }
         CatalogIf.super.onClose();
     }
 
@@ -1170,4 +1178,8 @@ public abstract class ExternalCatalog
             tableAutoAnalyzePolicy.put(key, policy);
         }
     }
+
+    public ThreadPoolExecutor getThreadPoolExecutor() {
+        return threadPoolWithPreAuth;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
index defa2a8ebe9..73598adfcaf 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
@@ -197,6 +197,12 @@ public class HMSExternalCatalog extends ExternalCatalog {
                     
String.valueOf(Config.hive_metastore_client_timeout_second));
         }
         HiveMetadataOps hiveOps = 
ExternalMetadataOperations.newHiveMetadataOps(hiveConf, jdbcClientConfig, this);
+        threadPoolWithPreAuth = 
ThreadPoolManager.newDaemonFixedThreadPoolWithPreAuth(
+                ICEBERG_CATALOG_EXECUTOR_THREAD_NUM,
+                Integer.MAX_VALUE,
+                String.format("hms_iceberg_catalog_%s_executor_pool", name),
+                true,
+                preExecutionAuthenticator);
         FileSystemProvider fileSystemProvider = new 
FileSystemProviderImpl(Env.getCurrentEnv().getExtMetaCacheMgr(),
                 this.bindBrokerName(), 
this.catalogProperty.getHadoopProperties());
         this.fileSystemExecutor = 
ThreadPoolManager.newDaemonFixedThreadPool(FILE_SYSTEM_EXECUTOR_THREAD_NUM,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
index 30590f5af26..e25199adfbe 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.datasource.iceberg;
 
+import org.apache.doris.common.ThreadPoolManager;
 import 
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
 import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.InitCatalogLog;
@@ -65,6 +66,12 @@ public abstract class IcebergExternalCatalog extends 
ExternalCatalog {
         initCatalog();
         IcebergMetadataOps ops = 
ExternalMetadataOperations.newIcebergMetadataOps(this, catalog);
         transactionManager = 
TransactionManagerFactory.createIcebergTransactionManager(ops);
+        threadPoolWithPreAuth = 
ThreadPoolManager.newDaemonFixedThreadPoolWithPreAuth(
+                ICEBERG_CATALOG_EXECUTOR_THREAD_NUM,
+                Integer.MAX_VALUE,
+                String.format("iceberg_catalog_%s_executor_pool", name),
+                true,
+                preExecutionAuthenticator);
         metadataOps = ops;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
index 4c06068aa3b..78c4ef8fec2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
@@ -49,6 +49,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.stream.Collectors;
 
 public class IcebergMetadataOps implements ExternalMetadataOps {
@@ -291,4 +292,8 @@ public class IcebergMetadataOps implements 
ExternalMetadataOps {
     private Namespace getNamespace() {
         return externalCatalogName.map(Namespace::of).orElseGet(() -> 
Namespace.empty());
     }
+
+    public ThreadPoolExecutor getThreadPoolWithPreAuth() {
+        return dorisCatalog.getThreadPoolExecutor();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
index e36db86022e..797caea0dea 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
@@ -155,7 +155,7 @@ public class IcebergTransaction implements Transaction {
 
     private void commitAppendTxn(Table table, List<WriteResult> 
pendingResults) {
         // commit append files.
-        AppendFiles appendFiles = table.newAppend();
+        AppendFiles appendFiles = 
table.newAppend().scanManifestsWith(ops.getThreadPoolWithPreAuth());
         for (WriteResult result : pendingResults) {
             Preconditions.checkState(result.referencedDataFiles().length == 0,
                     "Should have no referenced data files for append.");
@@ -171,7 +171,7 @@ public class IcebergTransaction implements Transaction {
             // 1. if dst_tb is a partitioned table, it will return directly.
             // 2. if dst_tb is an unpartitioned table, the `dst_tb` table will 
be emptied.
             if (!table.spec().isPartitioned()) {
-                OverwriteFiles overwriteFiles = table.newOverwrite();
+                OverwriteFiles overwriteFiles = 
table.newOverwrite().scanManifestsWith(ops.getThreadPoolWithPreAuth());
                 try (CloseableIterable<FileScanTask> fileScanTasks = 
table.newScan().planFiles()) {
                     fileScanTasks.forEach(f -> 
overwriteFiles.deleteFile(f.file()));
                 } catch (IOException e) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to