This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit bd2e7d0ec881b6724cde7b31a4a7da4e6dcc1883 Author: wuwenchi <wuwenchi...@hotmail.com> AuthorDate: Fri Mar 22 21:32:20 2024 +0800 [feature](hive)support insert overwrite (#32610) support insert overwrite for unpartitioned table and partitioned table. issue: #31442 --- .../apache/doris/datasource/hive/HMSCommitter.java | 96 ++++++++++++++++++++-- .../main/java/org/apache/doris/fs/FileSystem.java | 6 ++ .../apache/doris/fs/remote/dfs/DFSFileSystem.java | 40 +++++---- .../doris/datasource/hive/HmsCommitTest.java | 56 ++++++++++--- 4 files changed, 166 insertions(+), 32 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCommitter.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCommitter.java index 64abb985fcf..af26f36d6b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCommitter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCommitter.java @@ -80,6 +80,8 @@ public class HMSCommitter { private final Queue<DirectoryCleanUpTask> directoryCleanUpTasksForAbort = new ConcurrentLinkedQueue<>(); // when aborted, we need restore directory private final List<RenameDirectoryTask> renameDirectoryTasksForAbort = new ArrayList<>(); + // when finished, we need clear some directories + private final List<String> clearDirsForFinish = new ArrayList<>(); Executor fileSystemExecutor = Executors.newFixedThreadPool(16); public HMSCommitter(HiveMetadataOps hiveOps, RemoteFileSystem fs, Table table) { @@ -105,6 +107,8 @@ public class HMSCommitter { t.addSuppressed(new Exception("Failed to roll back after commit failure", e)); } throw t; + } finally { + runClearPathsForFinish(); } } @@ -250,7 +254,38 @@ public class HMSCommitter { } public void prepareOverwriteTable(THivePartitionUpdate pu, HivePartitionStatistics ps) { - + String targetPath = pu.getLocation().getTargetPath(); + String writePath = pu.getLocation().getWritePath(); + if (!targetPath.equals(writePath)) { + Path path = new Path(targetPath); + String oldTablePath = new Path(path.getParent(), "_temp_" + path.getName()).toString(); + Status status = fs.renameDir( + targetPath, + oldTablePath, + () -> renameDirectoryTasksForAbort.add(new RenameDirectoryTask(oldTablePath, targetPath))); + if (!status.ok()) { + throw new RuntimeException( + "Error to rename dir from " + targetPath + " to " + oldTablePath + status.getErrMsg()); + } + clearDirsForFinish.add(oldTablePath); + + status = fs.renameDir( + writePath, + targetPath, + () -> directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, true))); + if (!status.ok()) { + throw new RuntimeException( + "Error to rename dir from " + writePath + " to " + targetPath + ":" + status.getErrMsg()); + } + } + updateStatisticsTasks.add( + new UpdateStatisticsTask( + table.getDbName(), + table.getTableName(), + Optional.empty(), + ps, + false + )); } public void prepareCreateNewPartition(THivePartitionUpdate pu, HivePartitionStatistics ps) { @@ -335,7 +370,38 @@ public class HMSCommitter { public void prepareOverwritePartition(THivePartitionUpdate pu, HivePartitionStatistics ps) { + String targetPath = pu.getLocation().getTargetPath(); + String writePath = pu.getLocation().getWritePath(); + if (!targetPath.equals(writePath)) { + Path path = new Path(targetPath); + String oldPartitionPath = new Path(path.getParent(), "_temp_" + path.getName()).toString(); + Status status = fs.renameDir( + targetPath, + oldPartitionPath, + () -> renameDirectoryTasksForAbort.add(new RenameDirectoryTask(oldPartitionPath, targetPath))); + if (!status.ok()) { + throw new RuntimeException( + "Error to rename dir from " + targetPath + " to " + oldPartitionPath + ":" + status.getErrMsg()); + } + clearDirsForFinish.add(oldPartitionPath); + status = fs.renameDir( + writePath, + targetPath, + () -> directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, true))); + if (!status.ok()) { + throw new RuntimeException( + "Error to rename dir from " + writePath + " to " + targetPath + ":" + status.getErrMsg()); + } + } + updateStatisticsTasks.add( + new UpdateStatisticsTask( + table.getDbName(), + table.getTableName(), + Optional.of(pu.getName()), + ps, + false + )); } @@ -481,7 +547,7 @@ public class HMSCommitter { createdPartitionValues.add(partition.getPartition().getPartitionValues()); } } catch (Throwable t) { - LOG.error("Failed to add partition", t); + LOG.warn("Failed to add partition", t); throw t; } } @@ -583,7 +649,27 @@ public class HMSCommitter { } private void runRenameDirTasksForAbort() { - // TODO abort + Status status; + for (RenameDirectoryTask task : renameDirectoryTasksForAbort) { + status = fs.exists(task.getRenameFrom()); + if (status.ok()) { + status = fs.renameDir(task.getRenameFrom(), task.getRenameTo(), () -> {}); + if (!status.ok()) { + LOG.warn("Failed to abort rename dir from {} to {}:{}", + task.getRenameFrom(), task.getRenameTo(), status.getErrMsg()); + } + } + } + } + + private void runClearPathsForFinish() { + Status status; + for (String path : clearDirsForFinish) { + status = fs.delete(path); + if (!status.ok()) { + LOG.warn("Failed to recursively delete path {}:{}", path, status.getErrCode()); + } + } } @@ -591,10 +677,10 @@ public class HMSCommitter { DeleteRecursivelyResult deleteResult = recursiveDeleteFiles(directory, deleteEmptyDir); if (!deleteResult.getNotDeletedEligibleItems().isEmpty()) { - LOG.error("Failed to delete directory {}. Some eligible items can't be deleted: {}.", + LOG.warn("Failed to delete directory {}. Some eligible items can't be deleted: {}.", directory.toString(), deleteResult.getNotDeletedEligibleItems()); } else if (deleteEmptyDir && !deleteResult.dirNotExists()) { - LOG.error("Failed to delete directory {} due to dir isn't empty", directory.toString()); + LOG.warn("Failed to delete directory {} due to dir isn't empty", directory.toString()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java index 798f93a61c0..0470d8b3714 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java @@ -49,6 +49,12 @@ public interface FileSystem { Status rename(String origFilePath, String destFilePath); + default Status renameDir(String origFilePath, + String destFilePath, + Runnable runWhenPathNotExist) { + throw new UnsupportedOperationException("Unsupported operation rename dir on current file system."); + } + default void asyncRename(Executor executor, List<CompletableFuture<?>> renameFileFutures, AtomicBoolean cancelled, diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java index e8c645f3c9b..26008adf38c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java @@ -412,6 +412,28 @@ public class DFSFileSystem extends RemoteFileSystem { } } + public Status renameDir(String origFilePath, + String destFilePath, + Runnable runWhenPathNotExist) { + Status status = exists(destFilePath); + if (status.ok()) { + throw new RuntimeException("Destination directory already exists: " + destFilePath); + } + + String targetParent = new Path(destFilePath).getParent().toString(); + status = exists(targetParent); + if (Status.ErrCode.NOT_FOUND.equals(status.getErrCode())) { + status = makeDir(targetParent); + } + if (!status.ok()) { + throw new RuntimeException(status.getErrMsg()); + } + + runWhenPathNotExist.run(); + + return rename(origFilePath, destFilePath); + } + @Override public void asyncRenameDir(Executor executor, List<CompletableFuture<?>> renameFileFutures, @@ -423,23 +445,7 @@ public class DFSFileSystem extends RemoteFileSystem { if (cancelled.get()) { return; } - - Status status = exists(destFilePath); - if (status.ok()) { - throw new RuntimeException("Destination directory already exists: " + destFilePath); - } - - String targetParent = new Path(destFilePath).getParent().toString(); - status = exists(targetParent); - if (Status.ErrCode.NOT_FOUND.equals(status.getErrCode())) { - makeDir(targetParent); - } else if (!status.ok()) { - throw new RuntimeException(status.getErrMsg()); - } - - runWhenPathNotExist.run(); - - status = rename(origFilePath, destFilePath); + Status status = renameDir(origFilePath, destFilePath, runWhenPathNotExist); if (!status.ok()) { throw new RuntimeException(status.getErrMsg()); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java index 2316e65bf60..1cfb2270f92 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java @@ -134,7 +134,7 @@ public class HmsCommitTest { pus.add(createRandomAppend("")); hmsOps.commit(dbName, tbWithoutPartition, pus); Table table = hmsClient.getTable(dbName, tbWithoutPartition); - Assert.assertEquals(3, Long.parseLong(table.getParameters().get("numRows"))); + assertNumRows(3, table); List<THivePartitionUpdate> pus2 = new ArrayList<>(); pus2.add(createRandomAppend("")); @@ -142,12 +142,19 @@ public class HmsCommitTest { pus2.add(createRandomAppend("")); hmsOps.commit(dbName, tbWithoutPartition, pus2); table = hmsClient.getTable(dbName, tbWithoutPartition); - Assert.assertEquals(6, Long.parseLong(table.getParameters().get("numRows"))); + assertNumRows(6, table); } @Test public void testOverwritePartitionForUnPartitionedTable() { - // TODO + testAppendPartitionForUnPartitionedTable(); + List<THivePartitionUpdate> pus = new ArrayList<>(); + pus.add(createRandomOverwrite("")); + pus.add(createRandomOverwrite("")); + pus.add(createRandomOverwrite("")); + hmsOps.commit(dbName, tbWithoutPartition, pus); + Table table = hmsClient.getTable(dbName, tbWithoutPartition); + assertNumRows(3, table); } @Test @@ -162,11 +169,11 @@ public class HmsCommitTest { hmsOps.commit(dbName, tbWithPartition, pus); Partition pa = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("a")); - Assert.assertEquals(3, Long.parseLong(pa.getParameters().get("numRows"))); + assertNumRows(3, pa); Partition pb = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("b")); - Assert.assertEquals(2, Long.parseLong(pb.getParameters().get("numRows"))); + assertNumRows(2, pb); Partition pc = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("c")); - Assert.assertEquals(1, Long.parseLong(pc.getParameters().get("numRows"))); + assertNumRows(1, pc); } @Test @@ -183,11 +190,28 @@ public class HmsCommitTest { hmsOps.commit(dbName, tbWithPartition, pus); Partition pa = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("a")); - Assert.assertEquals(6, Long.parseLong(pa.getParameters().get("numRows"))); + assertNumRows(6, pa); + Partition pb = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("b")); + assertNumRows(4, pb); + Partition pc = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("c")); + assertNumRows(2, pc); + } + + @Test + public void testOverwritePartitionForPartitionedTable() { + testAppendPartitionForPartitionedTable(); + List<THivePartitionUpdate> pus = new ArrayList<>(); + pus.add(createRandomOverwrite("a")); + pus.add(createRandomOverwrite("b")); + pus.add(createRandomOverwrite("c")); + hmsOps.commit(dbName, tbWithPartition, pus); + + Partition pa = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("a")); + assertNumRows(1, pa); Partition pb = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("b")); - Assert.assertEquals(4, Long.parseLong(pb.getParameters().get("numRows"))); + assertNumRows(1, pb); Partition pc = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("c")); - Assert.assertEquals(2, Long.parseLong(pc.getParameters().get("numRows"))); + assertNumRows(1, pc); } @Test @@ -201,7 +225,7 @@ public class HmsCommitTest { hmsOps.commit(dbName, tbWithPartition, pus); for (int i = 0; i < nums; i++) { Partition p = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("" + i)); - Assert.assertEquals(1, Long.parseLong(p.getParameters().get("numRows"))); + assertNumRows(1, p); } try { @@ -211,6 +235,14 @@ public class HmsCommitTest { } } + public void assertNumRows(long expected, Partition p) { + Assert.assertEquals(expected, Long.parseLong(p.getParameters().get("numRows"))); + } + + public void assertNumRows(long expected, Table t) { + Assert.assertEquals(expected, Long.parseLong(t.getParameters().get("numRows"))); + } + public THivePartitionUpdate genOnePartitionUpdate(String partitionValue, TUpdateMode mode) { String uuid = UUID.randomUUID().toString(); @@ -242,4 +274,8 @@ public class HmsCommitTest { public THivePartitionUpdate createRandomAppend(String partition) { return genOnePartitionUpdate("c3=" + partition, TUpdateMode.APPEND); } + + public THivePartitionUpdate createRandomOverwrite(String partition) { + return genOnePartitionUpdate("c3=" + partition, TUpdateMode.OVERWRITE); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org