This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new e644e7f2e53 branch-3.0: [fix](hive)Clear processed tasks #45309 (#45336) e644e7f2e53 is described below commit e644e7f2e53967cc7ab77e86aec6b6efec5fe326 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Fri Dec 20 00:31:44 2024 +0800 branch-3.0: [fix](hive)Clear processed tasks #45309 (#45336) Cherry-picked from #45309 Co-authored-by: wuwenchi <wuwen...@selectdb.com> --- .../doris/datasource/hive/HMSTransaction.java | 12 +++++++ .../doris/datasource/hive/HmsCommitTest.java | 40 +++++++++++++++++++++- 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java index 02c99a695c8..9b88f7a8dea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java @@ -534,6 +534,11 @@ public class HMSTransaction implements Transaction { return partitions; } + public void clear() { + partitions.clear(); + createdPartitionValues.clear(); + } + public void addPartition(HivePartitionWithStatistics partition) { partitions.add(partition); } @@ -1143,6 +1148,7 @@ public class HMSTransaction implements Transaction { for (CompletableFuture<?> undoUpdateFuture : undoUpdateFutures.build()) { MoreFutures.getFutureValue(undoUpdateFuture); } + updateStatisticsTasks.clear(); } private void undoAddPartitionsTask() { @@ -1157,6 +1163,7 @@ public class HMSTransaction implements Transaction { LOG.warn("Failed to rollback: add_partition for partition values {}.{}", tableInfo, rollbackFailedPartitions); } + addPartitionsTask.clear(); } private void waitForAsyncFileSystemTaskSuppressThrowable() { @@ -1169,6 +1176,7 @@ public class HMSTransaction implements Transaction { // ignore } } + asyncFileSystemTaskFutures.clear(); } public void prepareInsertExistingTable(SimpleTableInfo tableInfo, TableAndMore tableAndMore) { @@ -1319,6 +1327,7 @@ public class HMSTransaction implements Transaction { for (DirectoryCleanUpTask cleanUpTask : directoryCleanUpTasksForAbort) { recursiveDeleteItems(cleanUpTask.getPath(), cleanUpTask.isDeleteEmptyDir(), false); } + directoryCleanUpTasksForAbort.clear(); } private void runRenameDirTasksForAbort() { @@ -1334,6 +1343,7 @@ public class HMSTransaction implements Transaction { } } } + renameDirectoryTasksForAbort.clear(); } private void runClearPathsForFinish() { @@ -1486,6 +1496,7 @@ public class HMSTransaction implements Transaction { .build()); }, fileSystemExecutor)); } + uncompletedMpuPendingUploads.clear(); } public void doNothing() { @@ -1520,6 +1531,7 @@ public class HMSTransaction implements Transaction { for (CompletableFuture<?> future : asyncFileSystemTaskFutures) { MoreFutures.getFutureValue(future, RuntimeException.class); } + asyncFileSystemTaskFutures.clear(); } public void shutdownExecutorService() { diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java index 0e79f8ee2b5..61b373706d9 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java @@ -675,5 +675,43 @@ public class HmsCommitTest { Partition pa = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("a")); assertNumRows(3, pa); } -} + @Test + public void testCommitWithRollback() { + genQueryID(); + List<THivePartitionUpdate> pus = new ArrayList<>(); + try { + pus.add(createRandomAppend(null)); + pus.add(createRandomAppend(null)); + pus.add(createRandomAppend(null)); + } catch (Throwable t) { + Assert.fail(); + } + + mockDoOther(() -> { + Table table = hmsClient.getTable(dbName, tbWithoutPartition); + assertNumRows(3, table); + }); + + HMSTransaction hmsTransaction = new HMSTransaction(hmsOps, fileSystemProvider, fileSystemExecutor); + try { + hmsTransaction.setHivePartitionUpdates(pus); + HiveInsertCommandContext ctx = new HiveInsertCommandContext(); + String queryId = DebugUtil.printId(ConnectContext.get().queryId()); + ctx.setQueryId(queryId); + ctx.setWritePath(getWritePath()); + hmsTransaction.beginInsertTable(ctx); + hmsTransaction.finishInsertTable(new SimpleTableInfo(dbName, tbWithoutPartition)); + hmsTransaction.commit(); + Assert.fail(); + } catch (Throwable t) { + Assert.assertTrue(t.getMessage().contains("failed to do nothing")); + } + + try { + hmsTransaction.rollback(); + } catch (Throwable t) { + Assert.fail(); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org