This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 140fb4c6e69 branch-2.1: [fix](iceberg) Fix transaction issues (#52716) 
(#53238)
140fb4c6e69 is described below

commit 140fb4c6e693a0df70814f7dc83bef878c125678
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Tue Jul 15 20:46:36 2025 -0700

    branch-2.1: [fix](iceberg) Fix transaction issues (#52716) (#53238)
    
    bp #52716
    
    Co-authored-by: wuwenchi <[email protected]>
    Co-authored-by: wuwenchi.wwc <[email protected]>
---
 .../datasource/iceberg/IcebergTransaction.java     | 25 ++++++++++++----------
 1 file changed, 14 insertions(+), 11 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
index 797caea0dea..c7d7212335a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
@@ -111,8 +111,8 @@ public class IcebergTransaction implements Transaction {
     }
 
     private void updateManifestAfterInsert(TUpdateMode updateMode) {
-        PartitionSpec spec = table.spec();
-        FileFormat fileFormat = IcebergUtils.getFileFormat(table);
+        PartitionSpec spec = transaction.table().spec();
+        FileFormat fileFormat = 
IcebergUtils.getFileFormat(transaction.table());
 
         List<WriteResult> pendingResults;
         if (commitDataList.isEmpty()) {
@@ -125,9 +125,9 @@ public class IcebergTransaction implements Transaction {
         }
 
         if (updateMode == TUpdateMode.APPEND) {
-            commitAppendTxn(table, pendingResults);
+            commitAppendTxn(pendingResults);
         } else {
-            commitReplaceTxn(table, pendingResults);
+            commitReplaceTxn(pendingResults);
         }
     }
 
@@ -146,16 +146,15 @@ public class IcebergTransaction implements Transaction {
         return 
commitDataList.stream().mapToLong(TIcebergCommitData::getRowCount).sum();
     }
 
-
     private synchronized Table getNativeTable(SimpleTableInfo tableInfo) {
         Objects.requireNonNull(tableInfo);
         ExternalCatalog externalCatalog = ops.getExternalCatalog();
         return IcebergUtils.getRemoteTable(externalCatalog, tableInfo);
     }
 
-    private void commitAppendTxn(Table table, List<WriteResult> 
pendingResults) {
+    private void commitAppendTxn(List<WriteResult> pendingResults) {
         // commit append files.
-        AppendFiles appendFiles = 
table.newAppend().scanManifestsWith(ops.getThreadPoolWithPreAuth());
+        AppendFiles appendFiles = 
transaction.newAppend().scanManifestsWith(ops.getThreadPoolWithPreAuth());
         for (WriteResult result : pendingResults) {
             Preconditions.checkState(result.referencedDataFiles().length == 0,
                     "Should have no referenced data files for append.");
@@ -165,13 +164,15 @@ public class IcebergTransaction implements Transaction {
     }
 
 
-    private void commitReplaceTxn(Table table, List<WriteResult> 
pendingResults) {
+    private void commitReplaceTxn(List<WriteResult> pendingResults) {
         if (pendingResults.isEmpty()) {
             // such as : insert overwrite table `dst_tb` select * from 
`empty_tb`
             // 1. if dst_tb is a partitioned table, it will return directly.
             // 2. if dst_tb is an unpartitioned table, the `dst_tb` table will 
be emptied.
-            if (!table.spec().isPartitioned()) {
-                OverwriteFiles overwriteFiles = 
table.newOverwrite().scanManifestsWith(ops.getThreadPoolWithPreAuth());
+            if (!transaction.table().spec().isPartitioned()) {
+                OverwriteFiles overwriteFiles = transaction
+                        .newOverwrite()
+                        .scanManifestsWith(ops.getThreadPoolWithPreAuth());
                 try (CloseableIterable<FileScanTask> fileScanTasks = 
table.newScan().planFiles()) {
                     fileScanTasks.forEach(f -> 
overwriteFiles.deleteFile(f.file()));
                 } catch (IOException e) {
@@ -183,7 +184,9 @@ public class IcebergTransaction implements Transaction {
         }
 
         // commit replace partitions
-        ReplacePartitions appendPartitionOp = table.newReplacePartitions();
+        ReplacePartitions appendPartitionOp = transaction
+                .newReplacePartitions()
+                .scanManifestsWith(ops.getThreadPoolWithPreAuth());
         for (WriteResult result : pendingResults) {
             Preconditions.checkState(result.referencedDataFiles().length == 0,
                     "Should have no referenced data files.");


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to