This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 31a7060dbdb88c8e81be00aada4a2df35eca625c
Author: wuwenchi <wuwenchi...@hotmail.com>
AuthorDate: Wed Apr 10 15:09:01 2024 +0800

    [testcase](hive)add exception test for hive txn (#33278)
    
    Issue #31442
    #32726
    
    1. add LocalDfsFileSystem to manipulate local files.
    2. add HMSCachedClientTest to analog HMS services.
    3. add test for rollback commit.
---
 .../doris/datasource/hive/HMSTransaction.java      |  91 +++--
 .../doris/datasource/hive/HiveMetadataOps.java     |  14 +-
 .../org/apache/doris/datasource/hive/HiveUtil.java |  65 +++-
 .../datasource/hive/ThriftHMSCachedClient.java     |  76 +----
 .../main/java/org/apache/doris/fs/FileSystem.java  |   9 +
 .../org/apache/doris/fs/LocalDfsFileSystem.java    | 245 ++++++++++++++
 .../java/org/apache/doris/fs/LocalFileSystem.java  |  76 -----
 .../apache/doris/fs/remote/dfs/DFSFileSystem.java  |  45 +++
 .../doris/datasource/HMSCachedClientTest.java      | 328 ++++++++++++++++++
 .../doris/datasource/hive/HmsCommitTest.java       | 374 ++++++++++++++++-----
 10 files changed, 1033 insertions(+), 290 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
index c3e8d00c5d1..84221b74e7f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
@@ -23,8 +23,8 @@ package org.apache.doris.datasource.hive;
 
 import org.apache.doris.backup.Status;
 import org.apache.doris.common.Pair;
+import org.apache.doris.fs.FileSystem;
 import org.apache.doris.fs.remote.RemoteFile;
-import org.apache.doris.fs.remote.RemoteFileSystem;
 import org.apache.doris.thrift.THivePartitionUpdate;
 import org.apache.doris.thrift.TUpdateMode;
 import org.apache.doris.transaction.Transaction;
@@ -47,11 +47,13 @@ import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Queue;
+import java.util.Set;
 import java.util.StringJoiner;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -63,7 +65,7 @@ import java.util.stream.Collectors;
 public class HMSTransaction implements Transaction {
     private static final Logger LOG = 
LogManager.getLogger(HMSTransaction.class);
     private final HiveMetadataOps hiveOps;
-    private final RemoteFileSystem fs;
+    private final FileSystem fs;
     private String dbName;
     private String tbName;
 
@@ -115,8 +117,8 @@ public class HMSTransaction implements Transaction {
     }
 
     public void finishInsertTable(String dbName, String tbName) {
-        this.tbName = tbName;
         this.dbName = dbName;
+        this.tbName = tbName;
         List<THivePartitionUpdate> mergedPUs = 
mergePartitions(hivePartitionUpdates);
         Table table = getTable(dbName, tbName);
         List<Pair<THivePartitionUpdate, HivePartitionStatistics>> 
insertExistsPartitions = new ArrayList<>();
@@ -226,17 +228,10 @@ public class HMSTransaction implements Transaction {
                 }
             }
 
-            hmsCommitter.waitForAsyncFileSystemTasks();
-            hmsCommitter.doAddPartitionsTask();
-            hmsCommitter.doUpdateStatisticsTasks();
+            hmsCommitter.doCommit();
         } catch (Throwable t) {
             LOG.warn("Failed to commit for {}.{}, abort it.", dbName, tbName);
-            hmsCommitter.cancelUnStartedAsyncFileSystemTask();
-            hmsCommitter.undoUpdateStatisticsTasks();
-            hmsCommitter.undoAddPartitionsTask();
-            hmsCommitter.waitForAsyncFileSystemTaskSuppressThrowable();
-            hmsCommitter.runDirectoryClearUpTasksForAbort();
-            hmsCommitter.runRenameDirTasksForAbort();
+            hmsCommitter.rollback();
             throw t;
         } finally {
             hmsCommitter.runClearPathsForFinish();
@@ -354,7 +349,7 @@ public class HMSTransaction implements Transaction {
         }
     }
 
-    private static class UpdateStatisticsTask {
+    public static class UpdateStatisticsTask {
         private final String dbName;
         private final String tableName;
         private final Optional<String> partitionName;
@@ -442,7 +437,6 @@ public class HMSTransaction implements Transaction {
                     throw t;
                 }
             }
-            partitions.clear();
         }
 
         public List<List<String>> rollback(HiveMetadataOps hiveOps) {
@@ -548,7 +542,7 @@ public class HMSTransaction implements Transaction {
 
     private DeleteRecursivelyResult recursiveDeleteFiles(Path directory, 
boolean deleteEmptyDir) {
         try {
-            if (!fs.exists(directory.getName()).ok()) {
+            if (!fs.exists(directory.toString()).ok()) {
                 return new DeleteRecursivelyResult(true, ImmutableList.of());
             }
         } catch (Exception e) {
@@ -561,57 +555,53 @@ public class HMSTransaction implements Transaction {
     }
 
     private DeleteRecursivelyResult doRecursiveDeleteFiles(Path directory, 
boolean deleteEmptyDir) {
-        List<RemoteFile> remoteFiles = new ArrayList<>();
-
-        Status status = fs.list(directory.getName(), remoteFiles);
-        if (!status.ok()) {
+        List<RemoteFile> allFiles = new ArrayList<>();
+        Set<String> allDirs = new HashSet<>();
+        Status statusFile = fs.listFiles(directory.toString(), allFiles);
+        Status statusDir = fs.listDirectories(directory.toString(), allDirs);
+        if (!statusFile.ok() || !statusDir.ok()) {
             ImmutableList.Builder<String> notDeletedEligibleItems = 
ImmutableList.builder();
             notDeletedEligibleItems.add(directory + "/*");
             return new DeleteRecursivelyResult(false, 
notDeletedEligibleItems.build());
         }
 
-        boolean isEmptyDir = true;
-        List<String> notDeletedEligibleItems = new ArrayList<>();
-        for (RemoteFile file : remoteFiles) {
-            if (file.isFile()) {
-                Path filePath = file.getPath();
-                isEmptyDir = false;
-                // TODO Check if this file was created by this query
-                if (!deleteIfExists(filePath)) {
-                    notDeletedEligibleItems.add(filePath.toString());
-                }
-            } else if (file.isDirectory()) {
-                DeleteRecursivelyResult subResult = 
doRecursiveDeleteFiles(file.getPath(), deleteEmptyDir);
-                if (!subResult.dirNotExists()) {
-                    isEmptyDir = false;
-                }
-                if (!subResult.getNotDeletedEligibleItems().isEmpty()) {
-                    
notDeletedEligibleItems.addAll(subResult.getNotDeletedEligibleItems());
-                }
-            } else {
-                isEmptyDir = false;
-                notDeletedEligibleItems.add(file.getPath().toString());
+        boolean allDescendentsDeleted = true;
+        ImmutableList.Builder<String> notDeletedEligibleItems = 
ImmutableList.builder();
+        for (RemoteFile file : allFiles) {
+            String fileName = file.getName();
+            if (!deleteIfExists(new Path(fileName))) {
+                allDescendentsDeleted = false;
+                notDeletedEligibleItems.add(fileName);
             }
         }
 
-        if (isEmptyDir && deleteEmptyDir) {
-            Verify.verify(notDeletedEligibleItems.isEmpty());
+        for (String dir : allDirs) {
+            DeleteRecursivelyResult subResult = doRecursiveDeleteFiles(new 
Path(dir), deleteEmptyDir);
+            if (!subResult.dirNotExists()) {
+                allDescendentsDeleted = false;
+            }
+            if (!subResult.getNotDeletedEligibleItems().isEmpty()) {
+                
notDeletedEligibleItems.addAll(subResult.getNotDeletedEligibleItems());
+            }
+        }
+
+        if (allDescendentsDeleted && deleteEmptyDir) {
+            Verify.verify(notDeletedEligibleItems.build().isEmpty());
             if (!deleteIfExists(directory)) {
                 return new DeleteRecursivelyResult(false, 
ImmutableList.of(directory + "/"));
             }
             // all items of the location have been deleted.
             return new DeleteRecursivelyResult(true, ImmutableList.of());
         }
-
-        return new DeleteRecursivelyResult(false, notDeletedEligibleItems);
+        return new DeleteRecursivelyResult(false, 
notDeletedEligibleItems.build());
     }
 
     public boolean deleteIfExists(Path path) {
-        Status status = fs.delete(path.getName());
+        Status status = fs.delete(path.toString());
         if (status.ok()) {
             return true;
         }
-        return !fs.exists(path.getName()).ok();
+        return !fs.exists(path.toString()).ok();
     }
 
     public static class DatabaseTableName {
@@ -1039,9 +1029,6 @@ public class HMSTransaction implements Transaction {
         }
 
         private void undoAddPartitionsTask() {
-            if (addPartitionsTask.isEmpty()) {
-                return;
-            }
 
             HivePartition firstPartition = 
addPartitionsTask.getPartitions().get(0).getPartition();
             String dbName = firstPartition.getDbName();
@@ -1304,10 +1291,16 @@ public class HMSTransaction implements Transaction {
             }
         }
 
+        public void doNothing() {
+            // do nothing
+            // only for regression test and unit test to throw exception
+        }
+
         public void doCommit() {
             waitForAsyncFileSystemTasks();
             doAddPartitionsTask();
             doUpdateStatisticsTasks();
+            doNothing();
         }
 
         public void rollback() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
index f3556d13a57..5cf362508e3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
@@ -34,7 +34,7 @@ import org.apache.doris.datasource.ExternalDatabase;
 import org.apache.doris.datasource.jdbc.client.JdbcClient;
 import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
 import org.apache.doris.datasource.operations.ExternalMetadataOps;
-import org.apache.doris.fs.remote.RemoteFileSystem;
+import org.apache.doris.fs.FileSystem;
 import org.apache.doris.fs.remote.dfs.DFSFileSystem;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -58,7 +58,7 @@ public class HiveMetadataOps implements ExternalMetadataOps {
     private static final Logger LOG = 
LogManager.getLogger(HiveMetadataOps.class);
     private static final int MIN_CLIENT_POOL_SIZE = 8;
     private final HMSCachedClient client;
-    private final RemoteFileSystem fs;
+    private final FileSystem fs;
     private final HMSExternalCatalog catalog;
 
     public HiveMetadataOps(HiveConf hiveConf, JdbcClientConfig 
jdbcClientConfig, HMSExternalCatalog catalog) {
@@ -75,11 +75,19 @@ public class HiveMetadataOps implements ExternalMetadataOps 
{
         this.fs = new DFSFileSystem(catalog.getProperties());
     }
 
+    // for test
+    public HiveMetadataOps(HMSExternalCatalog catalog, HMSCachedClient client, 
FileSystem fs) {
+        this.catalog = catalog;
+        this.client = client;
+        this.fs = fs;
+    }
+
+
     public HMSCachedClient getClient() {
         return client;
     }
 
-    public RemoteFileSystem getFs() {
+    public FileSystem getFs() {
         return fs;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java
index 0dc7eb5a386..dfbfe786985 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java
@@ -25,10 +25,12 @@ import org.apache.doris.fs.remote.BrokerFileSystem;
 import org.apache.doris.fs.remote.RemoteFileSystem;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Partition;
@@ -167,7 +169,9 @@ public final class HiveUtil {
         ImmutableMap.Builder<String, Partition> resultBuilder = 
ImmutableMap.builder();
         for (Map.Entry<String, List<String>> entry : 
partitionNameToPartitionValues.entrySet()) {
             Partition partition = 
partitionValuesToPartition.get(entry.getValue());
-            resultBuilder.put(entry.getKey(), partition);
+            if (partition != null) {
+                resultBuilder.put(entry.getKey(), partition);
+            }
         }
         return resultBuilder.build();
     }
@@ -267,4 +271,63 @@ public final class HiveUtil {
         database.setDescription(hiveDb.getComment());
         return database;
     }
+
+    public static Map<String, String> updateStatisticsParameters(
+            Map<String, String> parameters,
+            HiveCommonStatistics statistics) {
+        HashMap<String, String> result = new HashMap<>(parameters);
+
+        result.put(StatsSetupConst.NUM_FILES, 
String.valueOf(statistics.getFileCount()));
+        result.put(StatsSetupConst.ROW_COUNT, 
String.valueOf(statistics.getRowCount()));
+        result.put(StatsSetupConst.TOTAL_SIZE, 
String.valueOf(statistics.getTotalFileBytes()));
+
+        // CDH 5.16 metastore ignores stats unless 
STATS_GENERATED_VIA_STATS_TASK is set
+        // 
https://github.com/cloudera/hive/blob/cdh5.16.2-release/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java#L227-L231
+        if (!parameters.containsKey("STATS_GENERATED_VIA_STATS_TASK")) {
+            result.put("STATS_GENERATED_VIA_STATS_TASK", "workaround for 
potential lack of HIVE-12730");
+        }
+
+        return result;
+    }
+
+    public static HivePartitionStatistics 
toHivePartitionStatistics(Map<String, String> params) {
+        long rowCount = 
Long.parseLong(params.getOrDefault(StatsSetupConst.ROW_COUNT, "-1"));
+        long totalSize = 
Long.parseLong(params.getOrDefault(StatsSetupConst.TOTAL_SIZE, "-1"));
+        long numFiles = 
Long.parseLong(params.getOrDefault(StatsSetupConst.NUM_FILES, "-1"));
+        return HivePartitionStatistics.fromCommonStatistics(rowCount, 
numFiles, totalSize);
+    }
+
+    public static Partition 
toMetastoreApiPartition(HivePartitionWithStatistics partitionWithStatistics) {
+        Partition partition =
+                
toMetastoreApiPartition(partitionWithStatistics.getPartition());
+        partition.setParameters(updateStatisticsParameters(
+                partition.getParameters(), 
partitionWithStatistics.getStatistics().getCommonStatistics()));
+        return partition;
+    }
+
+    public static Partition toMetastoreApiPartition(HivePartition 
hivePartition) {
+        Partition result = new Partition();
+        result.setDbName(hivePartition.getDbName());
+        result.setTableName(hivePartition.getTblName());
+        result.setValues(hivePartition.getPartitionValues());
+        result.setSd(makeStorageDescriptorFromHivePartition(hivePartition));
+        result.setParameters(hivePartition.getParameters());
+        return result;
+    }
+
+    public static StorageDescriptor 
makeStorageDescriptorFromHivePartition(HivePartition partition) {
+        SerDeInfo serdeInfo = new SerDeInfo();
+        serdeInfo.setName(partition.getTblName());
+        serdeInfo.setSerializationLib(partition.getSerde());
+
+        StorageDescriptor sd = new StorageDescriptor();
+        sd.setLocation(Strings.emptyToNull(partition.getPath()));
+        sd.setCols(partition.getColumns());
+        sd.setSerdeInfo(serdeInfo);
+        sd.setInputFormat(partition.getInputFormat());
+        sd.setOutputFormat(partition.getOutputFormat());
+        sd.setParameters(ImmutableMap.of());
+
+        return sd;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
index 00fc0d03fa8..9fae854645b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
@@ -27,11 +27,8 @@ import 
org.apache.doris.datasource.property.constants.HMSProperties;
 import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient;
 import com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
-import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
@@ -56,8 +53,6 @@ import org.apache.hadoop.hive.metastore.api.LockState;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
 import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
@@ -67,7 +62,6 @@ import org.apache.logging.log4j.Logger;
 import java.security.PrivilegedExceptionAction;
 import java.util.BitSet;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -605,11 +599,11 @@ public class ThriftHMSCachedClient implements 
HMSCachedClient {
 
             Table originTable = getTable(dbName, tableName);
             Map<String, String> originParams = originTable.getParameters();
-            HivePartitionStatistics updatedStats = 
update.apply(toHivePartitionStatistics(originParams));
+            HivePartitionStatistics updatedStats = 
update.apply(HiveUtil.toHivePartitionStatistics(originParams));
 
             Table newTable = originTable.deepCopy();
             Map<String, String> newParams =
-                    updateStatisticsParameters(originParams, 
updatedStats.getCommonStatistics());
+                    HiveUtil.updateStatisticsParameters(originParams, 
updatedStats.getCommonStatistics());
             newParams.put("transient_lastDdlTime", 
String.valueOf(System.currentTimeMillis() / 1000));
             newTable.setParameters(newParams);
             client.client.alter_table(dbName, tableName, newTable);
@@ -633,11 +627,11 @@ public class ThriftHMSCachedClient implements 
HMSCachedClient {
 
             Partition originPartition = partitions.get(0);
             Map<String, String> originParams = originPartition.getParameters();
-            HivePartitionStatistics updatedStats = 
update.apply(toHivePartitionStatistics(originParams));
+            HivePartitionStatistics updatedStats = 
update.apply(HiveUtil.toHivePartitionStatistics(originParams));
 
             Partition modifiedPartition = originPartition.deepCopy();
             Map<String, String> newParams =
-                    updateStatisticsParameters(originParams, 
updatedStats.getCommonStatistics());
+                    HiveUtil.updateStatisticsParameters(originParams, 
updatedStats.getCommonStatistics());
             newParams.put("transient_lastDdlTime", 
String.valueOf(System.currentTimeMillis() / 1000));
             modifiedPartition.setParameters(newParams);
             client.client.alter_partition(dbName, tableName, 
modifiedPartition);
@@ -650,7 +644,7 @@ public class ThriftHMSCachedClient implements 
HMSCachedClient {
     public void addPartitions(String dbName, String tableName, 
List<HivePartitionWithStatistics> partitions) {
         try (ThriftHMSClient client = getClient()) {
             List<Partition> hivePartitions = partitions.stream()
-                    .map(ThriftHMSCachedClient::toMetastoreApiPartition)
+                    .map(HiveUtil::toMetastoreApiPartition)
                     .collect(Collectors.toList());
             client.client.add_partitions(hivePartitions);
         } catch (Exception e) {
@@ -666,64 +660,4 @@ public class ThriftHMSCachedClient implements 
HMSCachedClient {
             throw new RuntimeException("failed to drop partition for " + 
dbName + "." + tableName);
         }
     }
-
-    private static HivePartitionStatistics 
toHivePartitionStatistics(Map<String, String> params) {
-        long rowCount = 
Long.parseLong(params.getOrDefault(StatsSetupConst.ROW_COUNT, "-1"));
-        long totalSize = 
Long.parseLong(params.getOrDefault(StatsSetupConst.TOTAL_SIZE, "-1"));
-        long numFiles = 
Long.parseLong(params.getOrDefault(StatsSetupConst.NUM_FILES, "-1"));
-        return HivePartitionStatistics.fromCommonStatistics(rowCount, 
numFiles, totalSize);
-    }
-
-    private static Map<String, String> updateStatisticsParameters(
-            Map<String, String> parameters,
-            HiveCommonStatistics statistics) {
-        HashMap<String, String> result = new HashMap<>(parameters);
-
-        result.put(StatsSetupConst.NUM_FILES, 
String.valueOf(statistics.getFileCount()));
-        result.put(StatsSetupConst.ROW_COUNT, 
String.valueOf(statistics.getRowCount()));
-        result.put(StatsSetupConst.TOTAL_SIZE, 
String.valueOf(statistics.getTotalFileBytes()));
-
-        // CDH 5.16 metastore ignores stats unless 
STATS_GENERATED_VIA_STATS_TASK is set
-        // 
https://github.com/cloudera/hive/blob/cdh5.16.2-release/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java#L227-L231
-        if (!parameters.containsKey("STATS_GENERATED_VIA_STATS_TASK")) {
-            result.put("STATS_GENERATED_VIA_STATS_TASK", "workaround for 
potential lack of HIVE-12730");
-        }
-
-        return result;
-    }
-
-    public static Partition 
toMetastoreApiPartition(HivePartitionWithStatistics partitionWithStatistics) {
-        Partition partition =
-                
toMetastoreApiPartition(partitionWithStatistics.getPartition());
-        partition.setParameters(updateStatisticsParameters(
-                partition.getParameters(), 
partitionWithStatistics.getStatistics().getCommonStatistics()));
-        return partition;
-    }
-
-    public static Partition toMetastoreApiPartition(HivePartition 
hivePartition) {
-        Partition result = new Partition();
-        result.setDbName(hivePartition.getDbName());
-        result.setTableName(hivePartition.getTblName());
-        result.setValues(hivePartition.getPartitionValues());
-        result.setSd(makeStorageDescriptorFromHivePartition(hivePartition));
-        result.setParameters(hivePartition.getParameters());
-        return result;
-    }
-
-    private static StorageDescriptor 
makeStorageDescriptorFromHivePartition(HivePartition partition) {
-        SerDeInfo serdeInfo = new SerDeInfo();
-        serdeInfo.setName(partition.getTblName());
-        serdeInfo.setSerializationLib(partition.getSerde());
-
-        StorageDescriptor sd = new StorageDescriptor();
-        sd.setLocation(Strings.emptyToNull(partition.getPath()));
-        sd.setCols(partition.getColumns());
-        sd.setSerdeInfo(serdeInfo);
-        sd.setInputFormat(partition.getInputFormat());
-        sd.setOutputFormat(partition.getOutputFormat());
-        sd.setParameters(ImmutableMap.of());
-
-        return sd;
-    }
-
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java
index 0470d8b3714..369fc917d77 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java
@@ -23,6 +23,7 @@ import org.apache.doris.fs.remote.RemoteFile;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -86,4 +87,12 @@ public interface FileSystem {
     }
 
     Status list(String remotePath, List<RemoteFile> result, boolean 
fileNameOnly);
+
+    default Status listFiles(String remotePath, List<RemoteFile> result) {
+        throw new UnsupportedOperationException("Unsupported operation list 
files on current file system.");
+    }
+
+    default Status listDirectories(String remotePath, Set<String> result) {
+        throw new UnsupportedOperationException("Unsupported operation list 
directores on current file system.");
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/LocalDfsFileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/LocalDfsFileSystem.java
new file mode 100644
index 00000000000..0faf1916db0
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/LocalDfsFileSystem.java
@@ -0,0 +1,245 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.fs;
+
+import org.apache.doris.backup.Status;
+import org.apache.doris.common.UserException;
+import org.apache.doris.fs.remote.RemoteFile;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class LocalDfsFileSystem implements FileSystem {
+
+    public LocalFileSystem fs = LocalFileSystem.getLocal(new Configuration());
+
+    public LocalDfsFileSystem() throws IOException {
+    }
+
+    @Override
+    public Map<String, String> getProperties() {
+        return null;
+    }
+
+    @Override
+    public Status exists(String remotePath) {
+        boolean exists = false;
+        try {
+            exists = fs.exists(new Path(remotePath));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        if (exists) {
+            return Status.OK;
+        } else {
+            return new Status(Status.ErrCode.NOT_FOUND, "");
+        }
+    }
+
+    @Override
+    public Status downloadWithFileSize(String remoteFilePath, String 
localFilePath, long fileSize) {
+        return null;
+    }
+
+    @Override
+    public Status upload(String localPath, String remotePath) {
+        return null;
+    }
+
+    @Override
+    public Status directUpload(String content, String remoteFile) {
+        return null;
+    }
+
+    @Override
+    public Status rename(String origFilePath, String destFilePath) {
+        try {
+            fs.rename(new Path(origFilePath), new Path(destFilePath));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return Status.OK;
+    }
+
+    @Override
+    public Status renameDir(String origFilePath, String destFilePath, Runnable 
runWhenPathNotExist) {
+        Status status = exists(destFilePath);
+        if (status.ok()) {
+            throw new RuntimeException("Destination directory already exists: 
" + destFilePath);
+        }
+        String targetParent = new Path(destFilePath).getParent().toString();
+        status = exists(targetParent);
+        if (Status.ErrCode.NOT_FOUND.equals(status.getErrCode())) {
+            status = makeDir(targetParent);
+        }
+        if (!status.ok()) {
+            throw new RuntimeException(status.getErrMsg());
+        }
+
+        runWhenPathNotExist.run();
+
+        return rename(origFilePath, destFilePath);
+    }
+
+    @Override
+    public void asyncRename(Executor executor,
+                            List<CompletableFuture<?>> renameFileFutures,
+                            AtomicBoolean cancelled,
+                            String origFilePath,
+                            String destFilePath,
+                            List<String> fileNames)  {
+        for (String fileName : fileNames) {
+            Path source = new Path(origFilePath, fileName);
+            Path target = new Path(destFilePath, fileName);
+            renameFileFutures.add(CompletableFuture.runAsync(() -> {
+                if (cancelled.get()) {
+                    return;
+                }
+                Status status = rename(source.toString(), target.toString());
+                if (!status.ok()) {
+                    throw new RuntimeException(status.getErrMsg());
+                }
+            }, executor));
+        }
+    }
+
+    @Override
+    public void asyncRenameDir(Executor executor,
+                               List<CompletableFuture<?>> renameFileFutures,
+                               AtomicBoolean cancelled,
+                               String origFilePath,
+                               String destFilePath,
+                               Runnable runWhenPathNotExist) {
+        renameFileFutures.add(CompletableFuture.runAsync(() -> {
+            if (cancelled.get()) {
+                return;
+            }
+            Status status = renameDir(origFilePath, destFilePath, 
runWhenPathNotExist);
+            if (!status.ok()) {
+                throw new RuntimeException(status.getErrMsg());
+            }
+        }, executor));
+    }
+
+    @Override
+    public Status delete(String remotePath) {
+        try {
+            fs.delete(new Path(remotePath), true);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return Status.OK;
+    }
+
+    @Override
+    public Status makeDir(String remotePath) {
+        try {
+            fs.mkdirs(new Path(remotePath));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return Status.OK;
+    }
+
+    @Override
+    public RemoteFiles listLocatedFiles(String remotePath, boolean onlyFiles, 
boolean recursive) throws UserException {
+        return null;
+    }
+
+    @Override
+    public Status list(String remotePath, List<RemoteFile> result, boolean 
fileNameOnly) {
+        try {
+            FileStatus[] locatedFileStatusRemoteIterator = fs.listStatus(new 
Path(remotePath));
+            if (locatedFileStatusRemoteIterator == null) {
+                return Status.OK;
+            }
+            for (FileStatus fileStatus : locatedFileStatusRemoteIterator) {
+                RemoteFile remoteFile = new RemoteFile(
+                        fileNameOnly ? fileStatus.getPath().getName() : 
fileStatus.getPath().toString(),
+                        !fileStatus.isDirectory(), fileStatus.isDirectory() ? 
-1 : fileStatus.getLen(),
+                        fileStatus.getBlockSize(), 
fileStatus.getModificationTime());
+                result.add(remoteFile);
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return Status.OK;
+    }
+
+    @Override
+    public Status listFiles(String remotePath, List<RemoteFile> result) {
+        RemoteIterator<LocatedFileStatus> iterator;
+        try {
+            Path dirPath = new Path(remotePath);
+            iterator = fs.listFiles(dirPath, true);
+            while (iterator.hasNext()) {
+                LocatedFileStatus next = iterator.next();
+                String location = next.getPath().toString();
+                String child = location.substring(dirPath.toString().length());
+                while (child.startsWith("/")) {
+                    child = child.substring(1);
+                }
+                if (!child.contains("/")) {
+                    result.add(new RemoteFile(location, next.isFile(), 
next.getLen(), next.getBlockSize()));
+                }
+            }
+        } catch (IOException e) {
+            return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
+        }
+        return Status.OK;
+    }
+
+    @Override
+    public Status listDirectories(String remotePath, Set<String> result) {
+        try {
+            FileStatus[] fileStatuses = fs.listStatus(new Path(remotePath));
+            result.addAll(
+                    Arrays.stream(fileStatuses)
+                        .filter(FileStatus::isDirectory)
+                        .map(file -> file.getPath().toString() + "/")
+                        .collect(ImmutableSet.toImmutableSet()));
+        } catch (IOException e) {
+            return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
+        }
+        return Status.OK;
+    }
+
+    public void createFile(String path) throws IOException {
+        Path path1 = new Path(path);
+        if (!exists(path1.getParent().toString()).ok()) {
+            makeDir(path1.getParent().toString());
+        }
+        FSDataOutputStream build = fs.createFile(path1).build();
+        build.close();
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/LocalFileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/LocalFileSystem.java
deleted file mode 100644
index 1baaf9bd2f7..00000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/LocalFileSystem.java
+++ /dev/null
@@ -1,76 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.fs;
-
-import org.apache.doris.backup.Status;
-import org.apache.doris.fs.remote.RemoteFile;
-
-import java.util.List;
-import java.util.Map;
-
-public class LocalFileSystem implements FileSystem {
-    @Override
-    public Status exists(String remotePath) {
-        throw new UnsupportedOperationException("Unsupported operation on 
local file system.");
-    }
-
-    @Override
-    public Status downloadWithFileSize(String remoteFilePath, String 
localFilePath, long fileSize) {
-        throw new UnsupportedOperationException("Unsupported operation on 
local file system.");
-    }
-
-    @Override
-    public Status upload(String localPath, String remotePath) {
-        throw new UnsupportedOperationException("Unsupported operation on 
local file system.");
-    }
-
-    @Override
-    public Status directUpload(String content, String remoteFile) {
-        throw new UnsupportedOperationException("Unsupported operation on 
local file system.");
-    }
-
-    @Override
-    public Status rename(String origFilePath, String destFilePath) {
-        throw new UnsupportedOperationException("Unsupported operation on 
local file system.");
-    }
-
-    @Override
-    public Status delete(String remotePath) {
-        throw new UnsupportedOperationException("Unsupported operation on 
local file system.");
-    }
-
-    @Override
-    public Status makeDir(String remotePath) {
-        throw new UnsupportedOperationException("Unsupported operation on 
local file system.");
-    }
-
-    @Override
-    public RemoteFiles listLocatedFiles(String remotePath, boolean onlyFiles, 
boolean recursive) {
-        throw new UnsupportedOperationException("Unsupported operation on 
local file system.");
-    }
-
-    @Override
-    public Status list(String remotePath, List<RemoteFile> result, boolean 
fileNameOnly) {
-        throw new UnsupportedOperationException("Unsupported operation on 
local file system.");
-    }
-
-    @Override
-    public Map<String, String> getProperties() {
-        throw new UnsupportedOperationException("Unsupported operation on 
local file system.");
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
index fabc341389e..7e3032ca807 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
@@ -31,12 +31,15 @@ import org.apache.doris.fs.remote.RemoteFileSystem;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -51,9 +54,11 @@ import java.nio.ByteBuffer;
 import java.nio.file.FileVisitOption;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -523,4 +528,44 @@ public class DFSFileSystem extends RemoteFileSystem {
         }
         return Status.OK;
     }
+
+    @Override
+    public Status listFiles(String remotePath, List<RemoteFile> result) {
+        RemoteIterator<LocatedFileStatus> iterator;
+        try {
+            FileSystem fileSystem = nativeFileSystem(remotePath);
+            Path dirPath = new Path(remotePath);
+            iterator = fileSystem.listFiles(dirPath, true);
+            while (iterator.hasNext()) {
+                LocatedFileStatus next = iterator.next();
+                String location = next.getPath().toString();
+                String child = location.substring(dirPath.toString().length());
+                while (child.startsWith("/")) {
+                    child = child.substring(1);
+                }
+                if (!child.contains("/")) {
+                    result.add(new RemoteFile(location, next.isFile(), 
next.getLen(), next.getBlockSize()));
+                }
+            }
+        } catch (Exception e) {
+            return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
+        }
+        return Status.OK;
+    }
+
+    @Override
+    public Status listDirectories(String remotePath, Set<String> result) {
+        try {
+            FileSystem fileSystem = nativeFileSystem(remotePath);
+            FileStatus[] fileStatuses = fileSystem.listStatus(new 
Path(remotePath));
+            result.addAll(
+                    Arrays.stream(fileStatuses)
+                        .filter(FileStatus::isDirectory)
+                        .map(file -> file.getPath().toString() + "/")
+                        .collect(ImmutableSet.toImmutableSet()));
+        } catch (Exception e) {
+            return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
+        }
+        return Status.OK;
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/HMSCachedClientTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/HMSCachedClientTest.java
new file mode 100644
index 00000000000..126df780dbf
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/HMSCachedClientTest.java
@@ -0,0 +1,328 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.analysis.TableName;
+import org.apache.doris.datasource.hive.HMSCachedClient;
+import org.apache.doris.datasource.hive.HMSTransaction;
+import org.apache.doris.datasource.hive.HiveDatabaseMetadata;
+import org.apache.doris.datasource.hive.HivePartitionStatistics;
+import org.apache.doris.datasource.hive.HivePartitionWithStatistics;
+import org.apache.doris.datasource.hive.HiveTableMetadata;
+import org.apache.doris.datasource.hive.HiveUtil;
+import 
org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class HMSCachedClientTest implements HMSCachedClient {
+
+    public Map<HMSTransaction.DatabaseTableName, List<Partition>> partitions = 
new ConcurrentHashMap<>();
+    public Map<String, List<Table>> tables = new HashMap<>();
+    public List<Database> dbs = new ArrayList<>();
+
+    @Override
+    public Database getDatabase(String dbName) {
+        for (Database db : this.dbs) {
+            if (db.getName().equals(dbName)) {
+                return db;
+            }
+        }
+        throw new RuntimeException("can't found database: " + dbName);
+    }
+
+    @Override
+    public List<String> getAllDatabases() {
+        return null;
+    }
+
+    @Override
+    public List<String> getAllTables(String dbName) {
+        return null;
+    }
+
+    @Override
+    public boolean tableExists(String dbName, String tblName) {
+        List<Table> tablesList = getTableList(dbName);
+        for (Table table : tablesList) {
+            if (table.getTableName().equals(tblName)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public List<String> listPartitionNames(String dbName, String tblName) {
+        List<Partition> partitionList = getPartitionList(dbName, tblName);
+        ArrayList<String> ret = new ArrayList<>();
+        for (Partition partition : partitionList) {
+            StringBuilder names = new StringBuilder();
+            List<String> values = partition.getValues();
+            for (int i = 0; i < values.size(); i++) {
+                names.append(values.get(i));
+                if (i < values.size() - 1) {
+                    names.append("/");
+                }
+            }
+            ret.add(names.toString());
+        }
+        return ret;
+    }
+
+    @Override
+    public List<Partition> listPartitions(String dbName, String tblName) {
+        return getPartitionList(dbName, tblName);
+    }
+
+    @Override
+    public List<String> listPartitionNames(String dbName, String tblName, long 
maxListPartitionNum) {
+        return listPartitionNames(dbName, tblName);
+    }
+
+    @Override
+    public Partition getPartition(String dbName, String tblName, List<String> 
partitionValues) {
+        synchronized (this) {
+            List<Partition> partitionList = getPartitionList(dbName, tblName);
+            for (Partition partition : partitionList) {
+                if (partition.getValues().equals(partitionValues)) {
+                    return partition;
+                }
+            }
+            throw new RuntimeException("can't found partition");
+        }
+    }
+
+    @Override
+    public List<Partition> getPartitions(String dbName, String tblName, 
List<String> partitionNames) {
+        synchronized (this) {
+            List<Partition> partitionList = getPartitionList(dbName, tblName);
+            ArrayList<Partition> ret = new ArrayList<>();
+            List<List<String>> partitionValuesList =
+                    partitionNames
+                            .stream()
+                            .map(HiveUtil::toPartitionValues)
+                            .collect(Collectors.toList());
+            partitionValuesList.forEach(values -> {
+                for (Partition partition : partitionList) {
+                    if (partition.getValues().equals(values)) {
+                        ret.add(partition);
+                        break;
+                    }
+                }
+            });
+            return ret;
+        }
+    }
+
+    @Override
+    public Table getTable(String dbName, String tblName) {
+        List<Table> tablesList = getTableList(dbName);
+        for (Table table : tablesList) {
+            if (table.getTableName().equals(tblName)) {
+                return table;
+            }
+        }
+        throw new RuntimeException("can't found table: " + tblName);
+    }
+
+    @Override
+    public List<FieldSchema> getSchema(String dbName, String tblName) {
+        return null;
+    }
+
+    @Override
+    public List<ColumnStatisticsObj> getTableColumnStatistics(String dbName, 
String tblName, List<String> columns) {
+        return null;
+    }
+
+    @Override
+    public Map<String, List<ColumnStatisticsObj>> 
getPartitionColumnStatistics(String dbName, String tblName, List<String> 
partNames, List<String> columns) {
+        return null;
+    }
+
+    @Override
+    public CurrentNotificationEventId getCurrentNotificationEventId() {
+        return null;
+    }
+
+    @Override
+    public NotificationEventResponse getNextNotification(long lastEventId, int 
maxEvents, IMetaStoreClient.NotificationFilter filter) throws 
MetastoreNotificationFetchException {
+        return null;
+    }
+
+    @Override
+    public long openTxn(String user) {
+        return 0;
+    }
+
+    @Override
+    public void commitTxn(long txnId) {
+
+    }
+
+    @Override
+    public ValidWriteIdList getValidWriteIds(String fullTableName, long 
currentTransactionId) {
+        return null;
+    }
+
+    @Override
+    public void acquireSharedLock(String queryId, long txnId, String user, 
TableName tblName, List<String> partitionNames, long timeoutMs) {
+
+    }
+
+    @Override
+    public String getCatalogLocation(String catalogName) {
+        return null;
+    }
+
+    @Override
+    public void createDatabase(DatabaseMetadata db) {
+        dbs.add(HiveUtil.toHiveDatabase((HiveDatabaseMetadata) db));
+        tables.put(db.getDbName(), new ArrayList<>());
+    }
+
+    @Override
+    public void dropDatabase(String dbName) {
+        Database db = getDatabase(dbName);
+        this.dbs.remove(db);
+    }
+
+    @Override
+    public void dropTable(String dbName, String tableName) {
+        Table table = getTable(dbName, tableName);
+        this.tables.get(dbName).remove(table);
+        this.partitions.remove(new HMSTransaction.DatabaseTableName(dbName, 
tableName));
+    }
+
+    @Override
+    public void createTable(TableMetadata tbl, boolean ignoreIfExists) {
+        String dbName = tbl.getDbName();
+        String tbName = tbl.getTableName();
+        if (tableExists(dbName, tbName)) {
+            throw new RuntimeException("Table '" + tbName + "' has existed in 
'" + dbName + "'.");
+        }
+
+        List<Table> tableList = getTableList(tbl.getDbName());
+        tableList.add(HiveUtil.toHiveTable((HiveTableMetadata) tbl));
+        HMSTransaction.DatabaseTableName key = new 
HMSTransaction.DatabaseTableName(dbName, tbName);
+        partitions.put(key, new ArrayList<>());
+    }
+
+    @Override
+    public void updateTableStatistics(String dbName, String tableName, 
Function<HivePartitionStatistics, HivePartitionStatistics> update) {
+        synchronized (this) {
+            Table originTable = getTable(dbName, tableName);
+            Map<String, String> originParams = originTable.getParameters();
+            HivePartitionStatistics updatedStats = 
update.apply(HiveUtil.toHivePartitionStatistics(originParams));
+
+            Table newTable = originTable.deepCopy();
+            Map<String, String> newParams =
+                    HiveUtil.updateStatisticsParameters(originParams, 
updatedStats.getCommonStatistics());
+            newParams.put("transient_lastDdlTime", 
String.valueOf(System.currentTimeMillis() / 1000));
+            newTable.setParameters(newParams);
+            List<Table> tableList = getTableList(dbName);
+            tableList.remove(originTable);
+            tableList.add(newTable);
+        }
+    }
+
+    @Override
+    public void updatePartitionStatistics(String dbName, String tableName, 
String partitionName, Function<HivePartitionStatistics, 
HivePartitionStatistics> update) {
+
+        synchronized (this) {
+            List<Partition> partitions = getPartitions(dbName, tableName, 
ImmutableList.of(partitionName));
+            if (partitions.size() != 1) {
+                throw new RuntimeException("Metastore returned multiple 
partitions for name: " + partitionName);
+            }
+
+            Partition originPartition = partitions.get(0);
+            Map<String, String> originParams = originPartition.getParameters();
+            HivePartitionStatistics updatedStats = 
update.apply(HiveUtil.toHivePartitionStatistics(originParams));
+
+            Partition modifiedPartition = originPartition.deepCopy();
+            Map<String, String> newParams =
+                    HiveUtil.updateStatisticsParameters(originParams, 
updatedStats.getCommonStatistics());
+            newParams.put("transient_lastDdlTime", 
String.valueOf(System.currentTimeMillis() / 1000));
+            modifiedPartition.setParameters(newParams);
+
+            List<Partition> partitionList = getPartitionList(dbName, 
tableName);
+            partitionList.remove(originPartition);
+            partitionList.add(modifiedPartition);
+        }
+    }
+
+    @Override
+    public void addPartitions(String dbName, String tableName, 
List<HivePartitionWithStatistics> partitions) {
+        synchronized (this) {
+            List<Partition> partitionList = getPartitionList(dbName, 
tableName);
+            List<Partition> hivePartitions = partitions.stream()
+                    .map(HiveUtil::toMetastoreApiPartition)
+                    .collect(Collectors.toList());
+            partitionList.addAll(hivePartitions);
+        }
+    }
+
+    @Override
+    public void dropPartition(String dbName, String tableName, List<String> 
partitionValues, boolean deleteData) {
+        synchronized (this) {
+            List<Partition> partitionList = getPartitionList(dbName, 
tableName);
+            for (int j = 0; j < partitionList.size(); j++) {
+                Partition partition = partitionList.get(j);
+                if (partition.getValues().equals(partitionValues)) {
+                    partitionList.remove(partition);
+                    return;
+                }
+            }
+            throw new RuntimeException("can't found the partition");
+        }
+    }
+
+    public List<Partition> getPartitionList(String dbName, String tableName) {
+        HMSTransaction.DatabaseTableName key = new 
HMSTransaction.DatabaseTableName(dbName, tableName);
+        List<Partition> partitionList = this.partitions.get(key);
+        if (partitionList == null) {
+            throw new RuntimeException("can't found table: " + key);
+        }
+        return partitionList;
+    }
+
+    public List<Table> getTableList(String dbName) {
+        List<Table> tablesList = this.tables.get(dbName);
+        if (tablesList == null) {
+            throw new RuntimeException("can't found database: " + dbName);
+        }
+        return tablesList;
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
index fc939625ea9..4ec6ca84c52 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
@@ -17,10 +17,10 @@
 
 package org.apache.doris.datasource.hive;
 
-import org.apache.doris.backup.Status;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.PrimitiveType;
-import org.apache.doris.fs.remote.dfs.DFSFileSystem;
+import org.apache.doris.datasource.HMSCachedClientTest;
+import org.apache.doris.fs.LocalDfsFileSystem;
 import org.apache.doris.thrift.THiveLocationParams;
 import org.apache.doris.thrift.THivePartitionUpdate;
 import org.apache.doris.thrift.TUpdateMode;
@@ -28,6 +28,7 @@ import org.apache.doris.thrift.TUpdateMode;
 import com.google.common.collect.Lists;
 import mockit.Mock;
 import mockit.MockUp;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.junit.After;
@@ -35,59 +36,57 @@ import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
 
-@Ignore
 public class HmsCommitTest {
 
-    private static HMSExternalCatalog hmsCatalog;
     private static HiveMetadataOps hmsOps;
     private static HMSCachedClient hmsClient;
     private static final String dbName = "test_db";
     private static final String tbWithPartition = "test_tb_with_partition";
     private static final String tbWithoutPartition = 
"test_tb_without_partition";
-    private static Path warehousePath;
+    private static LocalDfsFileSystem fs;
     static String dbLocation;
-    private String fileFormat = "orc";
+    static String writeLocation;
+    static String uri = "thrift://127.0.0.1:9083";
+    static boolean hasRealHmsService = false;
 
     @BeforeClass
     public static void beforeClass() throws Throwable {
-        warehousePath = Files.createTempDirectory("test_warehouse_");
+        Path warehousePath = Files.createTempDirectory("test_warehouse_");
+        Path writePath = Files.createTempDirectory("test_write_");
         dbLocation = "file://" + warehousePath.toAbsolutePath() + "/";
+        writeLocation = "file://" + writePath.toAbsolutePath() + "/";
         createTestHiveCatalog();
         createTestHiveDatabase();
-        mockFs();
     }
 
     @AfterClass
     public static void afterClass() {
-        hmsClient.dropTable(dbName, tbWithPartition);
-        hmsClient.dropTable(dbName, tbWithoutPartition);
         hmsClient.dropDatabase(dbName);
     }
 
-    public static void createTestHiveCatalog() {
-        Map<String, String> props = new HashMap<>();
-        props.put("type", "hms");
-        props.put("hive.metastore.uris", "thrift://127.0.0.1:9083");
-        props.put("hadoop.username", "hadoop");
-        hmsCatalog = new HMSExternalCatalog(1, "hive_catalog", null, props, 
"comment");
-        hmsCatalog.setInitialized();
-        hmsCatalog.initLocalObjectsImpl();
-        hmsOps = (HiveMetadataOps) hmsCatalog.getMetadataOps();
-        hmsClient = hmsOps.getClient();
+    public static void createTestHiveCatalog() throws IOException {
+        fs = new LocalDfsFileSystem();
+
+        if (hasRealHmsService) {
+            // If you have a real HMS service, then you can use this client to 
create real connections for testing
+            HiveConf entries = new HiveConf();
+            entries.set("hive.metastore.uris", uri);
+            hmsClient = new ThriftHMSCachedClient(entries, 2);
+        } else {
+            hmsClient = new HMSCachedClientTest();
+        }
+        hmsOps = new HiveMetadataOps(null, hmsClient, fs);
     }
 
     public static void createTestHiveDatabase() {
@@ -98,53 +97,31 @@ public class HmsCommitTest {
         hmsClient.createDatabase(dbMetadata);
     }
 
-    public static void mockFs() {
-
-        new MockUp<DFSFileSystem>(DFSFileSystem.class) {
-            @Mock
-            public void asyncRenameDir(Executor executor,
-                                       List<CompletableFuture<?>> 
renameFileFutures,
-                                       AtomicBoolean cancelled,
-                                       String origFilePath,
-                                       String destFilePath,
-                                       Runnable runWhenPathNotExist) {
-            }
-
-            @Mock
-            public void asyncRename(Executor executor,
-                                    List<CompletableFuture<?>> 
renameFileFutures,
-                                    AtomicBoolean cancelled,
-                                    String origFilePath,
-                                    String destFilePath,
-                                    List<String> fileNames) {
-            }
-
-            @Mock
-            public Status renameDir(String origFilePath,
-                                    String destFilePath,
-                                    Runnable runWhenPathNotExist) {
-                return Status.OK;
-            }
-        };
-    }
-
     @Before
     public void before() {
-        // create table
+        // create table for tbWithPartition
         List<Column> columns = new ArrayList<>();
         columns.add(new Column("c1", PrimitiveType.INT, true));
         columns.add(new Column("c2", PrimitiveType.STRING, true));
         columns.add(new Column("c3", PrimitiveType.STRING, false));
         List<String> partitionKeys = new ArrayList<>();
         partitionKeys.add("c3");
+        String fileFormat = "orc";
+        HashMap<String, String> params = new HashMap<String, String>() {{
+                put("location_uri", dbLocation + tbWithPartition);
+            }};
         HiveTableMetadata tableMetadata = new HiveTableMetadata(
                 dbName, tbWithPartition, columns, partitionKeys,
-                new HashMap<>(), fileFormat);
+                params, fileFormat);
         hmsClient.createTable(tableMetadata, true);
 
+        // create table for tbWithoutPartition
+        HashMap<String, String> params2 = new HashMap<String, String>() {{
+                put("location_uri", dbLocation + tbWithPartition);
+            }};
         HiveTableMetadata tableMetadata2 = new HiveTableMetadata(
-                dbName, tbWithoutPartition, columns, new ArrayList<>(),
-                new HashMap<>(), fileFormat);
+                    dbName, tbWithoutPartition, columns, new ArrayList<>(),
+                    params2, fileFormat);
         hmsClient.createTable(tableMetadata2, true);
 
     }
@@ -156,45 +133,45 @@ public class HmsCommitTest {
     }
 
     @Test
-    public void testNewPartitionForUnPartitionedTable() {
+    public void testNewPartitionForUnPartitionedTable() throws IOException {
         List<THivePartitionUpdate> pus = new ArrayList<>();
-        pus.add(createRandomNew("a"));
+        pus.add(createRandomNew(null));
         Assert.assertThrows(Exception.class, () -> commit(dbName, 
tbWithoutPartition, pus));
     }
 
     @Test
-    public void testAppendPartitionForUnPartitionedTable() {
+    public void testAppendPartitionForUnPartitionedTable() throws IOException {
         List<THivePartitionUpdate> pus = new ArrayList<>();
-        pus.add(createRandomAppend(""));
-        pus.add(createRandomAppend(""));
-        pus.add(createRandomAppend(""));
+        pus.add(createRandomAppend(null));
+        pus.add(createRandomAppend(null));
+        pus.add(createRandomAppend(null));
         commit(dbName, tbWithoutPartition, pus);
         Table table = hmsClient.getTable(dbName, tbWithoutPartition);
         assertNumRows(3, table);
 
         List<THivePartitionUpdate> pus2 = new ArrayList<>();
-        pus2.add(createRandomAppend(""));
-        pus2.add(createRandomAppend(""));
-        pus2.add(createRandomAppend(""));
+        pus2.add(createRandomAppend(null));
+        pus2.add(createRandomAppend(null));
+        pus2.add(createRandomAppend(null));
         commit(dbName, tbWithoutPartition, pus2);
         table = hmsClient.getTable(dbName, tbWithoutPartition);
         assertNumRows(6, table);
     }
 
     @Test
-    public void testOverwritePartitionForUnPartitionedTable() {
+    public void testOverwritePartitionForUnPartitionedTable() throws 
IOException {
         testAppendPartitionForUnPartitionedTable();
         List<THivePartitionUpdate> pus = new ArrayList<>();
-        pus.add(createRandomOverwrite(""));
-        pus.add(createRandomOverwrite(""));
-        pus.add(createRandomOverwrite(""));
+        pus.add(createRandomOverwrite(null));
+        pus.add(createRandomOverwrite(null));
+        pus.add(createRandomOverwrite(null));
         commit(dbName, tbWithoutPartition, pus);
         Table table = hmsClient.getTable(dbName, tbWithoutPartition);
         assertNumRows(3, table);
     }
 
     @Test
-    public void testNewPartitionForPartitionedTable() {
+    public void testNewPartitionForPartitionedTable() throws IOException {
         List<THivePartitionUpdate> pus = new ArrayList<>();
         pus.add(createRandomNew("a"));
         pus.add(createRandomNew("a"));
@@ -213,7 +190,7 @@ public class HmsCommitTest {
     }
 
     @Test
-    public void testAppendPartitionForPartitionedTable() {
+    public void testAppendPartitionForPartitionedTable() throws IOException {
         testNewPartitionForPartitionedTable();
 
         List<THivePartitionUpdate> pus = new ArrayList<>();
@@ -234,7 +211,7 @@ public class HmsCommitTest {
     }
 
     @Test
-    public void testOverwritePartitionForPartitionedTable() {
+    public void testOverwritePartitionForPartitionedTable() throws IOException 
{
         testAppendPartitionForPartitionedTable();
         List<THivePartitionUpdate> pus = new ArrayList<>();
         pus.add(createRandomOverwrite("a"));
@@ -251,7 +228,7 @@ public class HmsCommitTest {
     }
 
     @Test
-    public void testNewManyPartitionForPartitionedTable() {
+    public void testNewManyPartitionForPartitionedTable() throws IOException {
         List<THivePartitionUpdate> pus = new ArrayList<>();
         int nums = 150;
         for (int i = 0; i < nums; i++) {
@@ -265,12 +242,30 @@ public class HmsCommitTest {
         }
 
         try {
-            commit(dbName, tbWithPartition, pus);
+            commit(dbName, tbWithPartition, 
Collections.singletonList(createRandomNew("1")));
         } catch (Exception e) {
             Assert.assertTrue(e.getMessage().contains("failed to add 
partitions"));
         }
     }
 
+    @Test
+    public void testErrorPartitionTypeFromHmsCheck() throws IOException {
+        // first add three partition: a,b,c
+        testNewPartitionForPartitionedTable();
+
+        // second append two partition: a,x
+        // but there is no 'x' partition in the previous table, so when 
verifying based on HMS,
+        // it will throw exception
+        List<THivePartitionUpdate> pus = new ArrayList<>();
+        pus.add(createRandomAppend("a"));
+        pus.add(createRandomAppend("x"));
+
+        Assert.assertThrows(
+                Exception.class,
+                () -> commit(dbName, tbWithPartition, pus)
+        );
+    }
+
     public void assertNumRows(long expected, Partition p) {
         Assert.assertEquals(expected, 
Long.parseLong(p.getParameters().get("numRows")));
     }
@@ -279,40 +274,62 @@ public class HmsCommitTest {
         Assert.assertEquals(expected, 
Long.parseLong(t.getParameters().get("numRows")));
     }
 
-    public THivePartitionUpdate genOnePartitionUpdate(String partitionValue, 
TUpdateMode mode) {
+    public THivePartitionUpdate genOnePartitionUpdate(TUpdateMode mode) throws 
IOException {
+        return genOnePartitionUpdate("", mode);
+    }
+
+    public THivePartitionUpdate genOnePartitionUpdate(String partitionValue, 
TUpdateMode mode) throws IOException {
 
         String uuid = UUID.randomUUID().toString();
         THiveLocationParams location = new THiveLocationParams();
-        String targetPath = dbLocation + uuid;
+        String targetPath = dbLocation + uuid + "/" + partitionValue;
+
         location.setTargetPath(targetPath);
-        location.setWritePath(targetPath);
+        location.setWritePath(writeLocation + partitionValue);
 
         THivePartitionUpdate pu = new THivePartitionUpdate();
-        pu.setName(partitionValue);
+        if (partitionValue != null) {
+            pu.setName(partitionValue);
+        }
         pu.setUpdateMode(mode);
         pu.setRowCount(1);
         pu.setFileSize(1);
         pu.setLocation(location);
+        String f1 = uuid + "f1";
+        String f2 = uuid + "f2";
+        String f3 = uuid + "f3";
+
         pu.setFileNames(new ArrayList<String>() {
             {
-                add(targetPath + "/f1");
-                add(targetPath + "/f2");
-                add(targetPath + "/f3");
+                add(f1);
+                add(f2);
+                add(f3);
             }
         });
+
+        if (mode != TUpdateMode.NEW) {
+            fs.makeDir(targetPath);
+        }
+
+        fs.createFile(writeLocation + partitionValue + "/" + f1);
+        fs.createFile(writeLocation + partitionValue + "/" + f2);
+        fs.createFile(writeLocation + partitionValue + "/" + f3);
         return pu;
     }
 
-    public THivePartitionUpdate createRandomNew(String partition) {
-        return genOnePartitionUpdate("c3=" + partition, TUpdateMode.NEW);
+    public THivePartitionUpdate createRandomNew(String partition) throws 
IOException {
+        return partition == null ? genOnePartitionUpdate(TUpdateMode.NEW) :
+            genOnePartitionUpdate("c3=" + partition, TUpdateMode.NEW);
     }
 
-    public THivePartitionUpdate createRandomAppend(String partition) {
-        return genOnePartitionUpdate("c3=" + partition, TUpdateMode.APPEND);
+    public THivePartitionUpdate createRandomAppend(String partition) throws 
IOException {
+        return partition == null ? genOnePartitionUpdate(TUpdateMode.APPEND) :
+            genOnePartitionUpdate("c3=" + partition, TUpdateMode.APPEND);
     }
 
-    public THivePartitionUpdate createRandomOverwrite(String partition) {
-        return genOnePartitionUpdate("c3=" + partition, TUpdateMode.OVERWRITE);
+    public THivePartitionUpdate createRandomOverwrite(String partition) throws 
IOException {
+        return partition == null ? 
genOnePartitionUpdate(TUpdateMode.OVERWRITE) :
+            genOnePartitionUpdate("c3=" + partition, TUpdateMode.OVERWRITE);
     }
 
     public void commit(String dbName,
@@ -323,4 +340,181 @@ public class HmsCommitTest {
         hmsTransaction.finishInsertTable(dbName, tableName);
         hmsTransaction.commit();
     }
+
+    public void mockAddPartitionTaskException(Runnable runnable) {
+        new 
MockUp<HMSTransaction.AddPartitionsTask>(HMSTransaction.AddPartitionsTask.class)
 {
+            @Mock
+            private void run(HiveMetadataOps hiveOps) {
+                runnable.run();
+                throw new RuntimeException("failed to add partition");
+            }
+        };
+    }
+
+    public void mockDoOther(Runnable runnable) {
+        new 
MockUp<HMSTransaction.HmsCommitter>(HMSTransaction.HmsCommitter.class) {
+            @Mock
+            private void doNothing() {
+                runnable.run();
+                throw new RuntimeException("failed to do nothing");
+            }
+        };
+    }
+
+    public void mockUpdateStatisticsTaskException(Runnable runnable) {
+        new 
MockUp<HMSTransaction.UpdateStatisticsTask>(HMSTransaction.UpdateStatisticsTask.class)
 {
+            @Mock
+            private void run(HiveMetadataOps hiveOps) {
+                runnable.run();
+                throw new RuntimeException("failed to update partition");
+            }
+        };
+    }
+
+    @Test
+    public void testRollbackNewPartitionForPartitionedTableForFilesystem() 
throws IOException {
+        List<THivePartitionUpdate> pus = new ArrayList<>();
+        pus.add(createRandomNew("a"));
+
+        THiveLocationParams location = pus.get(0).getLocation();
+
+        // For new partition, there should be no target path
+        Assert.assertFalse(fs.exists(location.getTargetPath()).ok());
+        Assert.assertTrue(fs.exists(location.getWritePath()).ok());
+
+        mockAddPartitionTaskException(() -> {
+            // When the commit is completed, these files should be renamed 
successfully
+            String targetPath = location.getTargetPath();
+            Assert.assertTrue(fs.exists(targetPath).ok());
+            for (String file : pus.get(0).getFileNames()) {
+                Assert.assertTrue(fs.exists(targetPath + "/" + file).ok());
+            }
+        });
+
+        try {
+            commit(dbName, tbWithPartition, pus);
+            Assert.assertTrue(false);
+        } catch (Exception e) {
+            // ignore
+        }
+
+        // After rollback, these files will be deleted
+        String targetPath = location.getTargetPath();
+        Assert.assertFalse(fs.exists(targetPath).ok());
+        for (String file : pus.get(0).getFileNames()) {
+            Assert.assertFalse(fs.exists(targetPath + "/" + file).ok());
+        }
+    }
+
+
+    @Test
+    public void testRollbackNewPartitionForPartitionedTableWithNewPartition() 
throws IOException {
+        // first create three partitions: a,b,c
+        testNewPartitionForPartitionedTable();
+
+        // second add 'new partition' for 'x'
+        //        add 'append partition' for 'a'
+        // when 'doCommit', 'new partition' will be executed before 'append 
partition'
+        // so, when 'rollback', the 'x' partition will be added and then 
deleted
+        List<THivePartitionUpdate> pus = new ArrayList<>();
+        pus.add(createRandomNew("x"));
+        pus.add(createRandomAppend("a"));
+
+        THiveLocationParams location = pus.get(0).getLocation();
+
+        // For new partition, there should be no target path
+        Assert.assertFalse(fs.exists(location.getTargetPath()).ok());
+        Assert.assertTrue(fs.exists(location.getWritePath()).ok());
+
+        mockUpdateStatisticsTaskException(() -> {
+            // When the commit is completed, these files should be renamed 
successfully
+            String targetPath = location.getTargetPath();
+            Assert.assertTrue(fs.exists(targetPath).ok());
+            for (String file : pus.get(0).getFileNames()) {
+                Assert.assertTrue(fs.exists(targetPath + "/" + file).ok());
+            }
+            // new partition will be executed before append partition,
+            // so, we can get the new partition
+            Partition px = hmsClient.getPartition(dbName, tbWithPartition, 
Lists.newArrayList("x"));
+            assertNumRows(1, px);
+        });
+
+        try {
+            commit(dbName, tbWithPartition, pus);
+            Assert.assertTrue(false);
+        } catch (Exception e) {
+            // ignore
+        }
+
+        // After rollback, these files will be deleted
+        String targetPath = location.getTargetPath();
+        Assert.assertFalse(fs.exists(targetPath).ok());
+        for (String file : pus.get(0).getFileNames()) {
+            Assert.assertFalse(fs.exists(targetPath + "/" + file).ok());
+        }
+        // x partition will be deleted
+        Assert.assertThrows(
+                "the 'x' partition should be deleted",
+                Exception.class,
+                () -> hmsClient.getPartition(dbName, tbWithPartition, 
Lists.newArrayList("x"))
+        );
+    }
+
+    @Test
+    public void 
testRollbackNewPartitionForPartitionedTableWithNewAppendPartition() throws 
IOException {
+        // first create three partitions: a,b,c
+        testNewPartitionForPartitionedTable();
+
+        // second add 'new partition' for 'x'
+        //        add 'append partition' for 'a'
+        List<THivePartitionUpdate> pus = new ArrayList<>();
+        pus.add(createRandomNew("x"));
+        pus.add(createRandomAppend("a"));
+
+        THiveLocationParams location = pus.get(0).getLocation();
+
+        // For new partition, there should be no target path
+        Assert.assertFalse(fs.exists(location.getTargetPath()).ok());
+        Assert.assertTrue(fs.exists(location.getWritePath()).ok());
+
+        mockDoOther(() -> {
+            // When the commit is completed, these files should be renamed 
successfully
+            String targetPath = location.getTargetPath();
+            Assert.assertTrue(fs.exists(targetPath).ok());
+            for (String file : pus.get(0).getFileNames()) {
+                Assert.assertTrue(fs.exists(targetPath + "/" + file).ok());
+            }
+            // new partition will be executed,
+            // so, we can get the new partition
+            Partition px = hmsClient.getPartition(dbName, tbWithPartition, 
Lists.newArrayList("x"));
+            assertNumRows(1, px);
+            // append partition will be executed,
+            // so, we can get the updated partition
+            Partition pa = hmsClient.getPartition(dbName, tbWithPartition, 
Lists.newArrayList("a"));
+            assertNumRows(4, pa);
+        });
+
+        try {
+            commit(dbName, tbWithPartition, pus);
+            Assert.assertTrue(false);
+        } catch (Exception e) {
+            // ignore
+        }
+
+        // After rollback, these files will be deleted
+        String targetPath = location.getTargetPath();
+        Assert.assertFalse(fs.exists(targetPath).ok());
+        for (String file : pus.get(0).getFileNames()) {
+            Assert.assertFalse(fs.exists(targetPath + "/" + file).ok());
+        }
+        // x partition will be deleted
+        Assert.assertThrows(
+                "the 'x' partition should be deleted",
+                Exception.class,
+                () -> hmsClient.getPartition(dbName, tbWithPartition, 
Lists.newArrayList("x"))
+        );
+        // the 'a' partition should be rollback
+        Partition pa = hmsClient.getPartition(dbName, tbWithPartition, 
Lists.newArrayList("a"));
+        assertNumRows(3, pa);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to