This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit bd2e7d0ec881b6724cde7b31a4a7da4e6dcc1883
Author: wuwenchi <wuwenchi...@hotmail.com>
AuthorDate: Fri Mar 22 21:32:20 2024 +0800

    [feature](hive)support insert overwrite (#32610)
    
    support insert overwrite for unpartitioned table and partitioned table.
    
    issue: #31442
---
 .../apache/doris/datasource/hive/HMSCommitter.java | 96 ++++++++++++++++++++--
 .../main/java/org/apache/doris/fs/FileSystem.java  |  6 ++
 .../apache/doris/fs/remote/dfs/DFSFileSystem.java  | 40 +++++----
 .../doris/datasource/hive/HmsCommitTest.java       | 56 ++++++++++---
 4 files changed, 166 insertions(+), 32 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCommitter.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCommitter.java
index 64abb985fcf..af26f36d6b9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCommitter.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCommitter.java
@@ -80,6 +80,8 @@ public class HMSCommitter {
     private final Queue<DirectoryCleanUpTask> directoryCleanUpTasksForAbort = 
new ConcurrentLinkedQueue<>();
     // when aborted, we need restore directory
     private final List<RenameDirectoryTask> renameDirectoryTasksForAbort = new 
ArrayList<>();
+    // when finished, we need clear some directories
+    private final List<String> clearDirsForFinish = new ArrayList<>();
     Executor fileSystemExecutor = Executors.newFixedThreadPool(16);
 
     public HMSCommitter(HiveMetadataOps hiveOps, RemoteFileSystem fs, Table 
table) {
@@ -105,6 +107,8 @@ public class HMSCommitter {
                 t.addSuppressed(new Exception("Failed to roll back after 
commit failure", e));
             }
             throw t;
+        } finally {
+            runClearPathsForFinish();
         }
     }
 
@@ -250,7 +254,38 @@ public class HMSCommitter {
     }
 
     public void prepareOverwriteTable(THivePartitionUpdate pu, 
HivePartitionStatistics ps) {
-
+        String targetPath = pu.getLocation().getTargetPath();
+        String writePath = pu.getLocation().getWritePath();
+        if (!targetPath.equals(writePath)) {
+            Path path = new Path(targetPath);
+            String oldTablePath = new Path(path.getParent(), "_temp_" + 
path.getName()).toString();
+            Status status = fs.renameDir(
+                    targetPath,
+                    oldTablePath,
+                    () -> renameDirectoryTasksForAbort.add(new 
RenameDirectoryTask(oldTablePath, targetPath)));
+            if (!status.ok()) {
+                throw new RuntimeException(
+                    "Error to rename dir from " + targetPath + " to " + 
oldTablePath + status.getErrMsg());
+            }
+            clearDirsForFinish.add(oldTablePath);
+
+            status =  fs.renameDir(
+                writePath,
+                targetPath,
+                () -> directoryCleanUpTasksForAbort.add(new 
DirectoryCleanUpTask(targetPath, true)));
+            if (!status.ok()) {
+                throw new RuntimeException(
+                    "Error to rename dir from " + writePath + " to " + 
targetPath + ":" + status.getErrMsg());
+            }
+        }
+        updateStatisticsTasks.add(
+            new UpdateStatisticsTask(
+                table.getDbName(),
+                table.getTableName(),
+                Optional.empty(),
+                ps,
+                false
+            ));
     }
 
     public void prepareCreateNewPartition(THivePartitionUpdate pu, 
HivePartitionStatistics ps) {
@@ -335,7 +370,38 @@ public class HMSCommitter {
 
 
     public void prepareOverwritePartition(THivePartitionUpdate pu, 
HivePartitionStatistics ps) {
+        String targetPath = pu.getLocation().getTargetPath();
+        String writePath = pu.getLocation().getWritePath();
+        if (!targetPath.equals(writePath)) {
+            Path path = new Path(targetPath);
+            String oldPartitionPath = new Path(path.getParent(), "_temp_" + 
path.getName()).toString();
+            Status status = fs.renameDir(
+                    targetPath,
+                    oldPartitionPath,
+                    () -> renameDirectoryTasksForAbort.add(new 
RenameDirectoryTask(oldPartitionPath, targetPath)));
+            if (!status.ok()) {
+                throw new RuntimeException(
+                    "Error to rename dir from " + targetPath + " to " + 
oldPartitionPath + ":" + status.getErrMsg());
+            }
+            clearDirsForFinish.add(oldPartitionPath);
 
+            status = fs.renameDir(
+                    writePath,
+                    targetPath,
+                    () -> directoryCleanUpTasksForAbort.add(new 
DirectoryCleanUpTask(targetPath, true)));
+            if (!status.ok()) {
+                throw new RuntimeException(
+                    "Error to rename dir from " + writePath + " to " + 
targetPath + ":" + status.getErrMsg());
+            }
+        }
+        updateStatisticsTasks.add(
+            new UpdateStatisticsTask(
+                table.getDbName(),
+                table.getTableName(),
+                Optional.of(pu.getName()),
+                ps,
+                false
+            ));
     }
 
 
@@ -481,7 +547,7 @@ public class HMSCommitter {
                         
createdPartitionValues.add(partition.getPartition().getPartitionValues());
                     }
                 } catch (Throwable t) {
-                    LOG.error("Failed to add partition", t);
+                    LOG.warn("Failed to add partition", t);
                     throw t;
                 }
             }
@@ -583,7 +649,27 @@ public class HMSCommitter {
     }
 
     private void runRenameDirTasksForAbort() {
-        // TODO abort
+        Status status;
+        for (RenameDirectoryTask task : renameDirectoryTasksForAbort) {
+            status = fs.exists(task.getRenameFrom());
+            if (status.ok()) {
+                status = fs.renameDir(task.getRenameFrom(), 
task.getRenameTo(), () -> {});
+                if (!status.ok()) {
+                    LOG.warn("Failed to abort rename dir from {} to {}:{}",
+                            task.getRenameFrom(), task.getRenameTo(), 
status.getErrMsg());
+                }
+            }
+        }
+    }
+
+    private void runClearPathsForFinish() {
+        Status status;
+        for (String path : clearDirsForFinish) {
+            status = fs.delete(path);
+            if (!status.ok()) {
+                LOG.warn("Failed to recursively delete path {}:{}", path, 
status.getErrCode());
+            }
+        }
     }
 
 
@@ -591,10 +677,10 @@ public class HMSCommitter {
         DeleteRecursivelyResult deleteResult = recursiveDeleteFiles(directory, 
deleteEmptyDir);
 
         if (!deleteResult.getNotDeletedEligibleItems().isEmpty()) {
-            LOG.error("Failed to delete directory {}. Some eligible items 
can't be deleted: {}.",
+            LOG.warn("Failed to delete directory {}. Some eligible items can't 
be deleted: {}.",
                     directory.toString(), 
deleteResult.getNotDeletedEligibleItems());
         } else if (deleteEmptyDir && !deleteResult.dirNotExists()) {
-            LOG.error("Failed to delete directory {} due to dir isn't empty", 
directory.toString());
+            LOG.warn("Failed to delete directory {} due to dir isn't empty", 
directory.toString());
         }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java
index 798f93a61c0..0470d8b3714 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java
@@ -49,6 +49,12 @@ public interface FileSystem {
 
     Status rename(String origFilePath, String destFilePath);
 
+    default Status renameDir(String origFilePath,
+                          String destFilePath,
+                          Runnable runWhenPathNotExist) {
+        throw new UnsupportedOperationException("Unsupported operation rename 
dir on current file system.");
+    }
+
     default void asyncRename(Executor executor,
                              List<CompletableFuture<?>> renameFileFutures,
                              AtomicBoolean cancelled,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
index e8c645f3c9b..26008adf38c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
@@ -412,6 +412,28 @@ public class DFSFileSystem extends RemoteFileSystem {
         }
     }
 
+    public Status renameDir(String origFilePath,
+                            String destFilePath,
+                            Runnable runWhenPathNotExist) {
+        Status status = exists(destFilePath);
+        if (status.ok()) {
+            throw new RuntimeException("Destination directory already exists: 
" + destFilePath);
+        }
+
+        String targetParent = new Path(destFilePath).getParent().toString();
+        status = exists(targetParent);
+        if (Status.ErrCode.NOT_FOUND.equals(status.getErrCode())) {
+            status = makeDir(targetParent);
+        }
+        if (!status.ok()) {
+            throw new RuntimeException(status.getErrMsg());
+        }
+
+        runWhenPathNotExist.run();
+
+        return rename(origFilePath, destFilePath);
+    }
+
     @Override
     public void asyncRenameDir(Executor executor,
                         List<CompletableFuture<?>> renameFileFutures,
@@ -423,23 +445,7 @@ public class DFSFileSystem extends RemoteFileSystem {
             if (cancelled.get()) {
                 return;
             }
-
-            Status status = exists(destFilePath);
-            if (status.ok()) {
-                throw new RuntimeException("Destination directory already 
exists: " + destFilePath);
-            }
-
-            String targetParent = new 
Path(destFilePath).getParent().toString();
-            status = exists(targetParent);
-            if (Status.ErrCode.NOT_FOUND.equals(status.getErrCode())) {
-                makeDir(targetParent);
-            } else if (!status.ok()) {
-                throw new RuntimeException(status.getErrMsg());
-            }
-
-            runWhenPathNotExist.run();
-
-            status = rename(origFilePath, destFilePath);
+            Status status = renameDir(origFilePath, destFilePath, 
runWhenPathNotExist);
             if (!status.ok()) {
                 throw new RuntimeException(status.getErrMsg());
             }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
index 2316e65bf60..1cfb2270f92 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
@@ -134,7 +134,7 @@ public class HmsCommitTest {
         pus.add(createRandomAppend(""));
         hmsOps.commit(dbName, tbWithoutPartition, pus);
         Table table = hmsClient.getTable(dbName, tbWithoutPartition);
-        Assert.assertEquals(3, 
Long.parseLong(table.getParameters().get("numRows")));
+        assertNumRows(3, table);
 
         List<THivePartitionUpdate> pus2 = new ArrayList<>();
         pus2.add(createRandomAppend(""));
@@ -142,12 +142,19 @@ public class HmsCommitTest {
         pus2.add(createRandomAppend(""));
         hmsOps.commit(dbName, tbWithoutPartition, pus2);
         table = hmsClient.getTable(dbName, tbWithoutPartition);
-        Assert.assertEquals(6, 
Long.parseLong(table.getParameters().get("numRows")));
+        assertNumRows(6, table);
     }
 
     @Test
     public void testOverwritePartitionForUnPartitionedTable() {
-        // TODO
+        testAppendPartitionForUnPartitionedTable();
+        List<THivePartitionUpdate> pus = new ArrayList<>();
+        pus.add(createRandomOverwrite(""));
+        pus.add(createRandomOverwrite(""));
+        pus.add(createRandomOverwrite(""));
+        hmsOps.commit(dbName, tbWithoutPartition, pus);
+        Table table = hmsClient.getTable(dbName, tbWithoutPartition);
+        assertNumRows(3, table);
     }
 
     @Test
@@ -162,11 +169,11 @@ public class HmsCommitTest {
         hmsOps.commit(dbName, tbWithPartition, pus);
 
         Partition pa = hmsClient.getPartition(dbName, tbWithPartition, 
Lists.newArrayList("a"));
-        Assert.assertEquals(3, 
Long.parseLong(pa.getParameters().get("numRows")));
+        assertNumRows(3, pa);
         Partition pb = hmsClient.getPartition(dbName, tbWithPartition, 
Lists.newArrayList("b"));
-        Assert.assertEquals(2, 
Long.parseLong(pb.getParameters().get("numRows")));
+        assertNumRows(2, pb);
         Partition pc = hmsClient.getPartition(dbName, tbWithPartition, 
Lists.newArrayList("c"));
-        Assert.assertEquals(1, 
Long.parseLong(pc.getParameters().get("numRows")));
+        assertNumRows(1, pc);
     }
 
     @Test
@@ -183,11 +190,28 @@ public class HmsCommitTest {
         hmsOps.commit(dbName, tbWithPartition, pus);
 
         Partition pa = hmsClient.getPartition(dbName, tbWithPartition, 
Lists.newArrayList("a"));
-        Assert.assertEquals(6, 
Long.parseLong(pa.getParameters().get("numRows")));
+        assertNumRows(6, pa);
+        Partition pb = hmsClient.getPartition(dbName, tbWithPartition, 
Lists.newArrayList("b"));
+        assertNumRows(4, pb);
+        Partition pc = hmsClient.getPartition(dbName, tbWithPartition, 
Lists.newArrayList("c"));
+        assertNumRows(2, pc);
+    }
+
+    @Test
+    public void testOverwritePartitionForPartitionedTable() {
+        testAppendPartitionForPartitionedTable();
+        List<THivePartitionUpdate> pus = new ArrayList<>();
+        pus.add(createRandomOverwrite("a"));
+        pus.add(createRandomOverwrite("b"));
+        pus.add(createRandomOverwrite("c"));
+        hmsOps.commit(dbName, tbWithPartition, pus);
+
+        Partition pa = hmsClient.getPartition(dbName, tbWithPartition, 
Lists.newArrayList("a"));
+        assertNumRows(1, pa);
         Partition pb = hmsClient.getPartition(dbName, tbWithPartition, 
Lists.newArrayList("b"));
-        Assert.assertEquals(4, 
Long.parseLong(pb.getParameters().get("numRows")));
+        assertNumRows(1, pb);
         Partition pc = hmsClient.getPartition(dbName, tbWithPartition, 
Lists.newArrayList("c"));
-        Assert.assertEquals(2, 
Long.parseLong(pc.getParameters().get("numRows")));
+        assertNumRows(1, pc);
     }
 
     @Test
@@ -201,7 +225,7 @@ public class HmsCommitTest {
         hmsOps.commit(dbName, tbWithPartition, pus);
         for (int i = 0; i < nums; i++) {
             Partition p = hmsClient.getPartition(dbName, tbWithPartition, 
Lists.newArrayList("" + i));
-            Assert.assertEquals(1, 
Long.parseLong(p.getParameters().get("numRows")));
+            assertNumRows(1, p);
         }
 
         try {
@@ -211,6 +235,14 @@ public class HmsCommitTest {
         }
     }
 
+    public void assertNumRows(long expected, Partition p) {
+        Assert.assertEquals(expected, 
Long.parseLong(p.getParameters().get("numRows")));
+    }
+
+    public void assertNumRows(long expected, Table t) {
+        Assert.assertEquals(expected, 
Long.parseLong(t.getParameters().get("numRows")));
+    }
+
     public THivePartitionUpdate genOnePartitionUpdate(String partitionValue, 
TUpdateMode mode) {
 
         String uuid = UUID.randomUUID().toString();
@@ -242,4 +274,8 @@ public class HmsCommitTest {
     public THivePartitionUpdate createRandomAppend(String partition) {
         return genOnePartitionUpdate("c3=" + partition, TUpdateMode.APPEND);
     }
+
+    public THivePartitionUpdate createRandomOverwrite(String partition) {
+        return genOnePartitionUpdate("c3=" + partition, TUpdateMode.OVERWRITE);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to