This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 03994b8d939 branch-2.1: [fix](iceberg)Fix the thread pool issue used
for commit. #51508 (#51528)
03994b8d939 is described below
commit 03994b8d9397100f4526b4fc6b03eac828ba5699
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Jul 11 06:31:40 2025 +0800
branch-2.1: [fix](iceberg)Fix the thread pool issue used for commit. #51508
(#51528)
Cherry-picked from #51508
---------
Co-authored-by: wuwenchi <[email protected]>
Co-authored-by: Mingyu Chen (Rayner) <[email protected]>
---
.../org/apache/doris/common/ThreadPoolManager.java | 68 ++++++++++++++++++++++
.../apache/doris/datasource/ExternalCatalog.java | 12 ++++
.../doris/datasource/hive/HMSExternalCatalog.java | 6 ++
.../datasource/iceberg/IcebergExternalCatalog.java | 7 +++
.../datasource/iceberg/IcebergMetadataOps.java | 5 ++
.../datasource/iceberg/IcebergTransaction.java | 4 +-
6 files changed, 100 insertions(+), 2 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
index 3be5af8ac54..0061e3702a1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
@@ -18,6 +18,7 @@
package org.apache.doris.common;
+import
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
import org.apache.doris.metric.Metric;
import org.apache.doris.metric.Metric.MetricUnit;
import org.apache.doris.metric.MetricLabel;
@@ -33,6 +34,7 @@ import java.util.Comparator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
@@ -68,6 +70,7 @@ import java.util.function.Supplier;
*/
public class ThreadPoolManager {
+ private static final Logger LOG =
LogManager.getLogger(ThreadPoolManager.class);
private static Map<String, ThreadPoolExecutor> nameToThreadPoolMap =
Maps.newConcurrentMap();
@@ -126,6 +129,50 @@ public class ThreadPoolManager {
new LogDiscardPolicyThrowException(poolName), poolName,
needRegisterMetric);
}
+ public static ThreadPoolExecutor newDaemonFixedThreadPoolWithPreAuth(
+ int numThread,
+ int queueSize,
+ String poolName,
+ boolean needRegisterMetric,
+ PreExecutionAuthenticator preAuth) {
+ return newDaemonThreadPoolWithPreAuth(numThread, numThread,
KEEP_ALIVE_TIME, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(queueSize), new
BlockedPolicy(poolName, 60),
+ poolName, needRegisterMetric, preAuth);
+ }
+
+ public static ThreadPoolExecutor newDaemonThreadPoolWithPreAuth(
+ int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue,
+ RejectedExecutionHandler handler,
+ String poolName,
+ boolean needRegisterMetric,
+ PreExecutionAuthenticator preAuth) {
+ ThreadFactory threadFactory = namedThreadFactoryWithPreAuth(poolName,
preAuth);
+ ThreadPoolExecutor threadPool = new ThreadPoolExecutor(corePoolSize,
maximumPoolSize,
+ keepAliveTime, unit, workQueue, threadFactory, handler);
+ if (needRegisterMetric) {
+ nameToThreadPoolMap.put(poolName, threadPool);
+ }
+ return threadPool;
+ }
+
+ private static ThreadFactory namedThreadFactoryWithPreAuth(String
poolName, PreExecutionAuthenticator preAuth) {
+ return new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat(poolName + "-%d")
+ .setThreadFactory(runnable -> new Thread(() -> {
+ try {
+ preAuth.execute(runnable);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }))
+ .build();
+ }
+
public static ThreadPoolExecutor newDaemonFixedThreadPool(int numThread,
int queueSize, String poolName, boolean needRegisterMetric) {
return newDaemonThreadPool(numThread, numThread, KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
@@ -377,4 +424,25 @@ public class ThreadPoolManager {
}
}
}
+
+ public static void shutdownExecutorService(ExecutorService
executorService) {
+ // Disable new tasks from being submitted
+ executorService.shutdown();
+ try {
+ // Wait a while for existing tasks to terminate
+ if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
+ // Cancel currently executing tasks
+ executorService.shutdownNow();
+ // Wait a while for tasks to respond to being cancelled
+ if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
+ LOG.warn("ExecutorService did not terminate");
+ }
+ }
+ } catch (InterruptedException e) {
+ // (Re-)Cancel if current thread also interrupted
+ executorService.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index bd3d966ac37..6052be35f57 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -35,6 +35,7 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
+import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
import org.apache.doris.common.Version;
import org.apache.doris.common.io.Text;
@@ -91,6 +92,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
/**
@@ -117,6 +119,8 @@ public abstract class ExternalCatalog
CREATE_TIME,
USE_META_CACHE);
+ protected static final int ICEBERG_CATALOG_EXECUTOR_THREAD_NUM =
Runtime.getRuntime().availableProcessors();
+
// Unique id of this catalog, will be assigned after catalog is loaded.
@SerializedName(value = "id")
protected long id;
@@ -157,6 +161,7 @@ public abstract class ExternalCatalog
protected Optional<Boolean> useMetaCache = Optional.empty();
protected MetaCache<ExternalDatabase<? extends ExternalTable>> metaCache;
protected PreExecutionAuthenticator preExecutionAuthenticator;
+ protected ThreadPoolExecutor threadPoolWithPreAuth;
private volatile Configuration cachedConf = null;
private byte[] confLock = new byte[0];
@@ -759,6 +764,9 @@ public abstract class ExternalCatalog
if (null != transactionManager) {
transactionManager = null;
}
+ if (threadPoolWithPreAuth != null) {
+ ThreadPoolManager.shutdownExecutorService(threadPoolWithPreAuth);
+ }
CatalogIf.super.onClose();
}
@@ -1170,4 +1178,8 @@ public abstract class ExternalCatalog
tableAutoAnalyzePolicy.put(key, policy);
}
}
+
+ public ThreadPoolExecutor getThreadPoolExecutor() {
+ return threadPoolWithPreAuth;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
index defa2a8ebe9..73598adfcaf 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
@@ -197,6 +197,12 @@ public class HMSExternalCatalog extends ExternalCatalog {
String.valueOf(Config.hive_metastore_client_timeout_second));
}
HiveMetadataOps hiveOps =
ExternalMetadataOperations.newHiveMetadataOps(hiveConf, jdbcClientConfig, this);
+ threadPoolWithPreAuth =
ThreadPoolManager.newDaemonFixedThreadPoolWithPreAuth(
+ ICEBERG_CATALOG_EXECUTOR_THREAD_NUM,
+ Integer.MAX_VALUE,
+ String.format("hms_iceberg_catalog_%s_executor_pool", name),
+ true,
+ preExecutionAuthenticator);
FileSystemProvider fileSystemProvider = new
FileSystemProviderImpl(Env.getCurrentEnv().getExtMetaCacheMgr(),
this.bindBrokerName(),
this.catalogProperty.getHadoopProperties());
this.fileSystemExecutor =
ThreadPoolManager.newDaemonFixedThreadPool(FILE_SYSTEM_EXECUTOR_THREAD_NUM,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
index 30590f5af26..e25199adfbe 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
@@ -17,6 +17,7 @@
package org.apache.doris.datasource.iceberg;
+import org.apache.doris.common.ThreadPoolManager;
import
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitCatalogLog;
@@ -65,6 +66,12 @@ public abstract class IcebergExternalCatalog extends
ExternalCatalog {
initCatalog();
IcebergMetadataOps ops =
ExternalMetadataOperations.newIcebergMetadataOps(this, catalog);
transactionManager =
TransactionManagerFactory.createIcebergTransactionManager(ops);
+ threadPoolWithPreAuth =
ThreadPoolManager.newDaemonFixedThreadPoolWithPreAuth(
+ ICEBERG_CATALOG_EXECUTOR_THREAD_NUM,
+ Integer.MAX_VALUE,
+ String.format("iceberg_catalog_%s_executor_pool", name),
+ true,
+ preExecutionAuthenticator);
metadataOps = ops;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
index 4c06068aa3b..78c4ef8fec2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
@@ -49,6 +49,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
public class IcebergMetadataOps implements ExternalMetadataOps {
@@ -291,4 +292,8 @@ public class IcebergMetadataOps implements
ExternalMetadataOps {
private Namespace getNamespace() {
return externalCatalogName.map(Namespace::of).orElseGet(() ->
Namespace.empty());
}
+
+ public ThreadPoolExecutor getThreadPoolWithPreAuth() {
+ return dorisCatalog.getThreadPoolExecutor();
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
index e36db86022e..797caea0dea 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
@@ -155,7 +155,7 @@ public class IcebergTransaction implements Transaction {
private void commitAppendTxn(Table table, List<WriteResult>
pendingResults) {
// commit append files.
- AppendFiles appendFiles = table.newAppend();
+ AppendFiles appendFiles =
table.newAppend().scanManifestsWith(ops.getThreadPoolWithPreAuth());
for (WriteResult result : pendingResults) {
Preconditions.checkState(result.referencedDataFiles().length == 0,
"Should have no referenced data files for append.");
@@ -171,7 +171,7 @@ public class IcebergTransaction implements Transaction {
// 1. if dst_tb is a partitioned table, it will return directly.
// 2. if dst_tb is an unpartitioned table, the `dst_tb` table will
be emptied.
if (!table.spec().isPartitioned()) {
- OverwriteFiles overwriteFiles = table.newOverwrite();
+ OverwriteFiles overwriteFiles =
table.newOverwrite().scanManifestsWith(ops.getThreadPoolWithPreAuth());
try (CloseableIterable<FileScanTask> fileScanTasks =
table.newScan().planFiles()) {
fileScanTasks.forEach(f ->
overwriteFiles.deleteFile(f.file()));
} catch (IOException e) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]