This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 77d7e08fcad [fix](iceberg)Fix the thread pool issue used for commit. 
(#51508)
77d7e08fcad is described below

commit 77d7e08fcad913739a9f5762bf9fa275ff9d4beb
Author: wuwenchi <[email protected]>
AuthorDate: Thu Jun 5 17:35:31 2025 +0800

    [fix](iceberg)Fix the thread pool issue used for commit. (#51508)
    
    ### What problem does this PR solve?
    
    Problem Summary:
    
    When Iceberg generates a new snapshot, it performs a merge operation
    based on the previous snapshot. This operation reads manifest files, and
    the file reading process uses a global thread pool. However, users may
    have their own authentication information, which requires the use of
    doAs to ensure context. Therefore, the thread pool provided by Iceberg
    cannot be used.
    
    ### Release note
    
    None
---
 .../src/main/java/org/apache/doris/datasource/ExternalCatalog.java   | 4 ++++
 .../java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java | 5 +++++
 .../java/org/apache/doris/datasource/iceberg/IcebergTransaction.java | 4 ++--
 3 files changed, 11 insertions(+), 2 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index cc0f0676796..15a45d11a36 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -1304,4 +1304,8 @@ public abstract class ExternalCatalog
             Env.getCurrentEnv().getExtMetaCacheMgr().invalidSchemaCache(id);
         }
     }
+
+    public ThreadPoolExecutor getThreadPoolExecutor() {
+        return threadPoolWithPreAuth;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
index 6a8698af9ef..70953c689a1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
@@ -49,6 +49,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.stream.Collectors;
 
 public class IcebergMetadataOps implements ExternalMetadataOps {
@@ -387,4 +388,8 @@ public class IcebergMetadataOps implements 
ExternalMetadataOps {
     private Namespace getNamespace() {
         return externalCatalogName.map(Namespace::of).orElseGet(() -> 
Namespace.empty());
     }
+
+    public ThreadPoolExecutor getThreadPoolWithPreAuth() {
+        return dorisCatalog.getThreadPoolExecutor();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
index e36db86022e..797caea0dea 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
@@ -155,7 +155,7 @@ public class IcebergTransaction implements Transaction {
 
     private void commitAppendTxn(Table table, List<WriteResult> 
pendingResults) {
         // commit append files.
-        AppendFiles appendFiles = table.newAppend();
+        AppendFiles appendFiles = 
table.newAppend().scanManifestsWith(ops.getThreadPoolWithPreAuth());
         for (WriteResult result : pendingResults) {
             Preconditions.checkState(result.referencedDataFiles().length == 0,
                     "Should have no referenced data files for append.");
@@ -171,7 +171,7 @@ public class IcebergTransaction implements Transaction {
             // 1. if dst_tb is a partitioned table, it will return directly.
             // 2. if dst_tb is an unpartitioned table, the `dst_tb` table will 
be emptied.
             if (!table.spec().isPartitioned()) {
-                OverwriteFiles overwriteFiles = table.newOverwrite();
+                OverwriteFiles overwriteFiles = 
table.newOverwrite().scanManifestsWith(ops.getThreadPoolWithPreAuth());
                 try (CloseableIterable<FileScanTask> fileScanTasks = 
table.newScan().planFiles()) {
                     fileScanTasks.forEach(f -> 
overwriteFiles.deleteFile(f.file()));
                 } catch (IOException e) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to