This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 483c56ea147 [fix](hive)Clear processed tasks (#45309)
483c56ea147 is described below

commit 483c56ea1476a94586d7ff719f0e27416f4d2700
Author: wuwenchi <wuwen...@selectdb.com>
AuthorDate: Thu Dec 12 12:28:08 2024 +0800

    [fix](hive)Clear processed tasks (#45309)
    
    ### What problem does this PR solve?
    
    Problem Summary:
    
    Clear processed tasks, or it will be executed twice.
---
 .../doris/datasource/hive/HMSTransaction.java      | 12 +++++++
 .../doris/datasource/hive/HmsCommitTest.java       | 40 +++++++++++++++++++++-
 2 files changed, 51 insertions(+), 1 deletion(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
index 02c99a695c8..9b88f7a8dea 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
@@ -534,6 +534,11 @@ public class HMSTransaction implements Transaction {
             return partitions;
         }
 
+        public void clear() {
+            partitions.clear();
+            createdPartitionValues.clear();
+        }
+
         public void addPartition(HivePartitionWithStatistics partition) {
             partitions.add(partition);
         }
@@ -1143,6 +1148,7 @@ public class HMSTransaction implements Transaction {
             for (CompletableFuture<?> undoUpdateFuture : 
undoUpdateFutures.build()) {
                 MoreFutures.getFutureValue(undoUpdateFuture);
             }
+            updateStatisticsTasks.clear();
         }
 
         private void undoAddPartitionsTask() {
@@ -1157,6 +1163,7 @@ public class HMSTransaction implements Transaction {
                 LOG.warn("Failed to rollback: add_partition for partition 
values {}.{}",
                         tableInfo, rollbackFailedPartitions);
             }
+            addPartitionsTask.clear();
         }
 
         private void waitForAsyncFileSystemTaskSuppressThrowable() {
@@ -1169,6 +1176,7 @@ public class HMSTransaction implements Transaction {
                     // ignore
                 }
             }
+            asyncFileSystemTaskFutures.clear();
         }
 
         public void prepareInsertExistingTable(SimpleTableInfo tableInfo, 
TableAndMore tableAndMore) {
@@ -1319,6 +1327,7 @@ public class HMSTransaction implements Transaction {
             for (DirectoryCleanUpTask cleanUpTask : 
directoryCleanUpTasksForAbort) {
                 recursiveDeleteItems(cleanUpTask.getPath(), 
cleanUpTask.isDeleteEmptyDir(), false);
             }
+            directoryCleanUpTasksForAbort.clear();
         }
 
         private void runRenameDirTasksForAbort() {
@@ -1334,6 +1343,7 @@ public class HMSTransaction implements Transaction {
                     }
                 }
             }
+            renameDirectoryTasksForAbort.clear();
         }
 
         private void runClearPathsForFinish() {
@@ -1486,6 +1496,7 @@ public class HMSTransaction implements Transaction {
                             .build());
                 }, fileSystemExecutor));
             }
+            uncompletedMpuPendingUploads.clear();
         }
 
         public void doNothing() {
@@ -1520,6 +1531,7 @@ public class HMSTransaction implements Transaction {
             for (CompletableFuture<?> future : asyncFileSystemTaskFutures) {
                 MoreFutures.getFutureValue(future, RuntimeException.class);
             }
+            asyncFileSystemTaskFutures.clear();
         }
 
         public void shutdownExecutorService() {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
index 0e79f8ee2b5..61b373706d9 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
@@ -675,5 +675,43 @@ public class HmsCommitTest {
         Partition pa = hmsClient.getPartition(dbName, tbWithPartition, 
Lists.newArrayList("a"));
         assertNumRows(3, pa);
     }
-}
 
+    @Test
+    public void testCommitWithRollback() {
+        genQueryID();
+        List<THivePartitionUpdate> pus = new ArrayList<>();
+        try {
+            pus.add(createRandomAppend(null));
+            pus.add(createRandomAppend(null));
+            pus.add(createRandomAppend(null));
+        } catch (Throwable t) {
+            Assert.fail();
+        }
+
+        mockDoOther(() -> {
+            Table table = hmsClient.getTable(dbName, tbWithoutPartition);
+            assertNumRows(3, table);
+        });
+
+        HMSTransaction hmsTransaction = new HMSTransaction(hmsOps, 
fileSystemProvider, fileSystemExecutor);
+        try {
+            hmsTransaction.setHivePartitionUpdates(pus);
+            HiveInsertCommandContext ctx = new HiveInsertCommandContext();
+            String queryId = DebugUtil.printId(ConnectContext.get().queryId());
+            ctx.setQueryId(queryId);
+            ctx.setWritePath(getWritePath());
+            hmsTransaction.beginInsertTable(ctx);
+            hmsTransaction.finishInsertTable(new SimpleTableInfo(dbName, 
tbWithoutPartition));
+            hmsTransaction.commit();
+            Assert.fail();
+        } catch (Throwable t) {
+            Assert.assertTrue(t.getMessage().contains("failed to do nothing"));
+        }
+
+        try {
+            hmsTransaction.rollback();
+        } catch (Throwable t) {
+            Assert.fail();
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to