This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1-lakehouse
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 576d79e619525420bfe90c3eaa69adeffa9f48e1
Author: daidai <changyu...@selectdb.com>
AuthorDate: Fri Jan 3 09:57:21 2025 +0800

    [feature](hive)Support read hive4 transaction tables. (#44001)
    
    Problem Summary:
    Support read hive4 transaction tables.
    
    1. Since there are compilation problems when directly using hive4 API to
    get the file need to read, a function called
    `ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java#getAcidState`
    is implemented.
    
    2. At the same time, it is forbidden to create, insert data, and delete
    aicd tables.
    
    3. fix `'transactional_properties' = 'insert_only'` table read Incorrect
    data read.
    
    TODO : merge HMSTransaction ,HiveTransaction class
                  merge  HiveTransactionMgr, HiveTransactionManager class
    
    [feature](hive)Support read hive4 transaction tables
---
 .../format/table/transactional_hive_reader.cpp     |  33 +-
 .../scripts/create_preinstalled_scripts/run25.hql  |  52 ++
 .../org/apache/doris/datasource/hive/AcidUtil.java | 453 ++++++++++++++++
 .../doris/datasource/hive/HMSCachedClient.java     |   3 +-
 .../doris/datasource/hive/HMSExternalTable.java    |  16 +-
 .../doris/datasource/hive/HiveMetaStoreCache.java  | 144 +----
 .../doris/datasource/hive/HiveMetadataOps.java     |  24 +
 .../doris/datasource/hive/HiveTransaction.java     |  12 +-
 .../hive/PostgreSQLJdbcHMSCachedClient.java        |   3 +-
 .../datasource/hive/ThriftHMSCachedClient.java     |  18 +-
 .../doris/datasource/hive/source/HiveScanNode.java |  16 +-
 .../commands/insert/InsertIntoTableCommand.java    |   3 +
 .../doris/datasource/TestHMSCachedClient.java      |   3 +-
 .../apache/doris/datasource/hive/HiveAcidTest.java | 595 +++++++++++++++++++++
 .../hive/test_transactional_hive.out               | Bin 570 -> 835 bytes
 .../hive/test_hive_translation_insert_only.out     | Bin 0 -> 181 bytes
 .../hive/test_transactional_hive.groovy            |  69 +++
 .../hive/test_hive_translation_insert_only.groovy  |  91 ++++
 18 files changed, 1384 insertions(+), 151 deletions(-)

diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.cpp 
b/be/src/vec/exec/format/table/transactional_hive_reader.cpp
index bc4262b7451..18642ab1218 100644
--- a/be/src/vec/exec/format/table/transactional_hive_reader.cpp
+++ b/be/src/vec/exec/format/table/transactional_hive_reader.cpp
@@ -17,6 +17,8 @@
 
 #include "transactional_hive_reader.h"
 
+#include <re2/re2.h>
+
 #include "runtime/runtime_state.h"
 #include "transactional_hive_common.h"
 #include "vec/data_types/data_type_factory.hpp"
@@ -108,15 +110,38 @@ Status TransactionalHiveReader::init_row_filters(const 
TFileRangeDesc& range,
     int64_t num_delete_files = 0;
     std::filesystem::path file_path(data_file_path);
 
+    //See 
https://github.com/apache/hive/commit/ffee30e6267e85f00a22767262192abb9681cfb7#diff-5fe26c36b4e029dcd344fc5d484e7347R165
+    // bucket_xxx_attemptId => bucket_xxx
+    // bucket_xxx           => bucket_xxx
+    auto remove_bucket_attemptId = [](const std::string& str) {
+        re2::RE2 pattern("^bucket_\\d+_\\d+$");
+
+        if (re2::RE2::FullMatch(str, pattern)) {
+            size_t pos = str.rfind('_');
+            if (pos != std::string::npos) {
+                return str.substr(0, pos);
+            }
+        }
+        return str;
+    };
+
     SCOPED_TIMER(_transactional_orc_profile.delete_files_read_time);
     for (auto& delete_delta : 
range.table_format_params.transactional_hive_params.delete_deltas) {
         const std::string file_name = file_path.filename().string();
-        auto iter = std::find(delete_delta.file_names.begin(), 
delete_delta.file_names.end(),
-                              file_name);
-        if (iter == delete_delta.file_names.end()) {
+
+        //need opt.
+        std::vector<std::string> delete_delta_file_names;
+        for (const auto& x : delete_delta.file_names) {
+            delete_delta_file_names.emplace_back(remove_bucket_attemptId(x));
+        }
+        auto iter = std::find(delete_delta_file_names.begin(), 
delete_delta_file_names.end(),
+                              remove_bucket_attemptId(file_name));
+        if (iter == delete_delta_file_names.end()) {
             continue;
         }
-        auto delete_file = fmt::format("{}/{}", 
delete_delta.directory_location, file_name);
+        auto delete_file =
+                fmt::format("{}/{}", delete_delta.directory_location,
+                            delete_delta.file_names[iter - 
delete_delta_file_names.begin()]);
 
         TFileRangeDesc delete_range;
         // must use __set() method to make sure __isset is true
diff --git 
a/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run25.hql
 
b/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run25.hql
index 814df4cdc5f..da6400bdff0 100755
--- 
a/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run25.hql
+++ 
b/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run25.hql
@@ -41,3 +41,55 @@ insert into orc_full_acid_par PARTITION(part_col=20230102) 
values
 (6, 'F');
 
 update orc_full_acid_par set value = 'BB' where id = 2;
+
+
+
+
+create table orc_to_acid_tb (id INT, value STRING)
+PARTITIONED BY (part_col INT)
+CLUSTERED BY (id) INTO 3 BUCKETS
+STORED AS ORC;
+INSERT INTO TABLE orc_to_acid_tb PARTITION (part_col=101) VALUES (1, 'A'), (3, 
'C');
+INSERT INTO TABLE orc_to_acid_tb PARTITION (part_col=102) VALUES (2, 'B');
+ALTER TABLE orc_to_acid_tb SET TBLPROPERTIES ('transactional'='true');
+
+
+create table orc_to_acid_compacted_tb (id INT, value STRING)
+PARTITIONED BY (part_col INT)
+CLUSTERED BY (id) INTO 3 BUCKETS
+STORED AS ORC;
+INSERT INTO TABLE orc_to_acid_compacted_tb PARTITION (part_col=101) VALUES (1, 
'A'), (3, 'C');
+INSERT INTO TABLE orc_to_acid_compacted_tb PARTITION (part_col=102) VALUES (2, 
'B');
+ALTER TABLE orc_to_acid_compacted_tb SET TBLPROPERTIES 
('transactional'='true');
+ALTER TABLE orc_to_acid_compacted_tb COMPACT 'major';
+INSERT INTO TABLE orc_to_acid_compacted_tb PARTITION (part_col=102) VALUES (4, 
'D');
+update orc_to_acid_compacted_tb set value = "CC" where id = 3; 
+update orc_to_acid_compacted_tb set value = "BB" where id = 2; 
+
+
+create table orc_acid_minor (id INT, value STRING)
+CLUSTERED BY (id) INTO 3 BUCKETS
+STORED AS ORC
+TBLPROPERTIES ('transactional' = 'true');
+insert into orc_acid_minor values (1, 'A');
+insert into orc_acid_minor values (2, 'B');
+insert into orc_acid_minor values (3, 'C');
+update orc_acid_minor set value = "BB" where id = 2; 
+ALTER TABLE orc_acid_minor COMPACT 'minor';
+insert into orc_acid_minor values (4, 'D');
+update orc_acid_minor set value = "DD" where id = 4; 
+DELETE FROM orc_acid_minor WHERE id = 3;
+
+
+create table orc_acid_major (id INT, value STRING)
+CLUSTERED BY (id) INTO 3 BUCKETS
+STORED AS ORC
+TBLPROPERTIES ('transactional' = 'true');
+insert into orc_acid_major values (1, 'A');
+insert into orc_acid_major values (2, 'B');
+insert into orc_acid_major values (3, 'C');
+update orc_acid_major set value = "BB" where id = 2; 
+ALTER TABLE orc_acid_major COMPACT 'minor';
+insert into orc_acid_major values (4, 'D');
+update orc_acid_major set value = "DD" where id = 4; 
+DELETE FROM orc_acid_major WHERE id = 3;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidUtil.java
new file mode 100644
index 00000000000..915a1a410d1
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidUtil.java
@@ -0,0 +1,453 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hive;
+
+import org.apache.doris.backup.Status;
+import org.apache.doris.common.util.LocationPath;
+import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo;
+import org.apache.doris.datasource.hive.HiveMetaStoreCache.FileCacheValue;
+import org.apache.doris.fs.FileSystem;
+import org.apache.doris.fs.remote.RemoteFile;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.ToString;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.common.ValidWriteIdList.RangeResponse;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class AcidUtil {
+    private static final Logger LOG = LogManager.getLogger(AcidUtil.class);
+
+    public static final String VALID_TXNS_KEY = "hive.txn.valid.txns";
+    public static final String VALID_WRITEIDS_KEY = "hive.txn.valid.writeids";
+
+    private static final String HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX = 
"bucket_";
+    private static final String DELTA_SIDE_FILE_SUFFIX = "_flush_length";
+
+    // An `_orc_acid_version` file is written to each base/delta/delete_delta 
dir written by a full acid write
+    // or compaction.  This is the primary mechanism for versioning acid data.
+    // Each individual ORC file written stores the current version as a user 
property in ORC footer. All data files
+    // produced by Acid write should have this (starting with Hive 3.0), 
including those written by compactor.This is
+    // more for sanity checking in case someone moved the files around or 
something like that.
+    // In hive, methods for getting/reading the version from files were moved 
to test which is the only place they are
+    // used (after HIVE-23506), in order to keep devs out of temptation, since 
they access the FileSystem which
+    // is expensive.
+    // After `HIVE-23825: Create a flag to turn off _orc_acid_version file 
creation`, introduce variables to
+    // control whether to generate `_orc_acid_version` file. So don't need 
check this file exist.
+    private static final String HIVE_ORC_ACID_VERSION_FILE = 
"_orc_acid_version";
+
+    @Getter
+    @ToString
+    private static class ParsedBase {
+        private final long writeId;
+        private final long visibilityId;
+
+        public ParsedBase(long writeId, long visibilityId) {
+            this.writeId = writeId;
+            this.visibilityId = visibilityId;
+        }
+    }
+
+    private static ParsedBase parseBase(String name) {
+        //format1 : base_writeId
+        //format2 : base_writeId_visibilityId  detail: 
https://issues.apache.org/jira/browse/HIVE-20823
+        name = name.substring("base_".length());
+        int index = name.indexOf("_v");
+        if (index == -1) {
+            return new ParsedBase(Long.parseLong(name), 0);
+        }
+        return new ParsedBase(
+                Long.parseLong(name.substring(0, index)),
+                Long.parseLong(name.substring(index + 2)));
+    }
+
+    @Getter
+    @ToString
+    @EqualsAndHashCode
+    private static class ParsedDelta implements Comparable<ParsedDelta> {
+        private final long min;
+        private final long max;
+        private final String path;
+        private final int statementId;
+        private final boolean deleteDelta;
+        private final long visibilityId;
+
+        public ParsedDelta(long min, long max, @NonNull String path, int 
statementId,
+                boolean deleteDelta, long visibilityId) {
+            this.min = min;
+            this.max = max;
+            this.path = path;
+            this.statementId = statementId;
+            this.deleteDelta = deleteDelta;
+            this.visibilityId = visibilityId;
+        }
+
+        /*
+         * Smaller minWID orders first;
+         * If minWID is the same, larger maxWID orders first;
+         * Otherwise, sort by stmtID; files w/o stmtID orders first.
+         *
+         * Compactions (Major/Minor) merge deltas/bases but delete of old files
+         * happens in a different process; thus it's possible to have 
bases/deltas with
+         * overlapping writeId boundaries.  The sort order helps figure out 
the "best" set of files
+         * to use to get data.
+         * This sorts "wider" delta before "narrower" i.e. delta_5_20 sorts 
before delta_5_10 (and delta_11_20)
+         */
+        @Override
+        public int compareTo(ParsedDelta other) {
+            return min != other.min ? Long.compare(min, other.min) :
+                    other.max != max ? Long.compare(other.max, max) :
+                            statementId != other.statementId
+                                    ? Integer.compare(statementId, 
other.statementId) :
+                                    path.compareTo(other.path);
+        }
+    }
+
+
+    private static boolean isValidMetaDataFile(FileSystem fileSystem, String 
baseDir)
+            throws IOException {
+        String fileLocation = baseDir + "_metadata_acid";
+        Status status = fileSystem.exists(fileLocation);
+        if (status != Status.OK) {
+            return false;
+        }
+        //In order to save the cost of reading the file content, we only check 
whether the file exists.
+        // File Contents: {"thisFileVersion":"0","dataFormat":"compacted"}
+        //
+        // Map<String, String> metadata;
+        // try (var in = read(fileLocation)) {
+        //     metadata = new ObjectMapper().readValue(in, new 
TypeReference<>() {});
+        // }
+        // catch (IOException e) {
+        //     throw new IOException(String.format("Failed to read %s: %s", 
fileLocation, e.getMessage()), e);
+        // }
+        //
+        // String version = metadata.get("thisFileVersion");
+        // if (!"0".equals(version)) {
+        //     throw new IOException("Unexpected ACID metadata version: " + 
version);
+        // }
+        //
+        // String format = metadata.get("dataFormat");
+        // if (!"compacted".equals(format)) {
+        //     throw new IOException("Unexpected value for ACID dataFormat: " 
+ format);
+        // }
+        return true;
+    }
+
+    private static boolean isValidBase(FileSystem fileSystem, String baseDir,
+            ParsedBase base, ValidWriteIdList writeIdList) throws IOException {
+        if (base.writeId == Long.MIN_VALUE) {
+            //Ref: https://issues.apache.org/jira/browse/HIVE-13369
+            //such base is created by 1st compaction in case of non-acid to 
acid table conversion.(you
+            //will get dir: `base_-9223372036854775808`)
+            //By definition there are no open txns with id < 1.
+            //After this: https://issues.apache.org/jira/browse/HIVE-18192, 
txns(global transaction ID) => writeId.
+            return true;
+        }
+
+        // hive 4 : just check "_v" suffix, before hive 4 : check 
`_metadata_acid` file in baseDir.
+        if ((base.visibilityId > 0) || isValidMetaDataFile(fileSystem, 
baseDir)) {
+            return writeIdList.isValidBase(base.writeId);
+        }
+
+        // if here, it's a result of IOW
+        return writeIdList.isWriteIdValid(base.writeId);
+    }
+
+    private static ParsedDelta parseDelta(String fileName, String deltaPrefix, 
String path) {
+        // format1: delta_min_max_statementId_visibilityId, 
delete_delta_min_max_statementId_visibilityId
+        //     _visibilityId maybe not exists.
+        //     detail: https://issues.apache.org/jira/browse/HIVE-20823
+        // format2: delta_min_max_visibilityId, delete_delta_min_visibilityId
+        //     when minor compaction runs, we collapse per statement delta 
files inside a single
+        //     transaction so we no longer need a statementId in the file name
+
+        // String fileName = fileName.substring(name.lastIndexOf('/') + 1);
+        // checkArgument(fileName.startsWith(deltaPrefix), "File does not 
start with '%s': %s", deltaPrefix, path);
+
+        long visibilityId = 0;
+        int visibilityIdx = fileName.indexOf("_v");
+        if (visibilityIdx != -1) {
+            visibilityId = Long.parseLong(fileName.substring(visibilityIdx + 
2));
+            fileName = fileName.substring(0, visibilityIdx);
+        }
+
+        boolean deleteDelta = deltaPrefix.equals("delete_delta_");
+
+        String rest = fileName.substring(deltaPrefix.length());
+        int split = rest.indexOf('_');
+        int split2 = rest.indexOf('_', split + 1);
+        long min = Long.parseLong(rest.substring(0, split));
+
+        if (split2 == -1) {
+            long max = Long.parseLong(rest.substring(split + 1));
+            return new ParsedDelta(min, max, path, -1, deleteDelta, 
visibilityId);
+        }
+
+        long max = Long.parseLong(rest.substring(split + 1, split2));
+        int statementId = Integer.parseInt(rest.substring(split2 + 1));
+        return new ParsedDelta(min, max, path, statementId, deleteDelta, 
visibilityId);
+    }
+
+    public interface FileFilter {
+        public boolean accept(String fileName);
+    }
+
+    public static final class  FullAcidFileFilter implements FileFilter {
+        @Override
+        public boolean accept(String fileName) {
+            return fileName.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)
+                    && !fileName.endsWith(DELTA_SIDE_FILE_SUFFIX);
+        }
+    }
+
+    public static final class InsertOnlyFileFilter implements FileFilter {
+        @Override
+        public boolean accept(String fileName) {
+            return true;
+        }
+    }
+
+    //Since the hive3 library cannot read the hive4 transaction table 
normally, and there are many problems
+    // when using the Hive 4 library directly, this method is implemented.
+    //Ref: 
hive/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java#getAcidState
+    public static FileCacheValue getAcidState(FileSystem fileSystem, 
HivePartition partition,
+            Map<String, String> txnValidIds, Map<String, String> catalogProps, 
boolean isFullAcid) throws Exception {
+
+        // Ref: https://issues.apache.org/jira/browse/HIVE-18192
+        // Readers should use the combination of ValidTxnList and 
ValidWriteIdList(Table) for snapshot isolation.
+        // ValidReadTxnList implements ValidTxnList
+        // ValidReaderWriteIdList implements ValidWriteIdList
+        ValidTxnList validTxnList = null;
+        if (txnValidIds.containsKey(VALID_TXNS_KEY)) {
+            validTxnList = new ValidReadTxnList();
+            validTxnList.readFromString(
+                    txnValidIds.get(VALID_TXNS_KEY)
+            );
+        } else {
+            throw new RuntimeException("Miss ValidTxnList");
+        }
+
+        ValidWriteIdList validWriteIdList = null;
+        if (txnValidIds.containsKey(VALID_WRITEIDS_KEY)) {
+            validWriteIdList = new ValidReaderWriteIdList();
+            validWriteIdList.readFromString(
+                    txnValidIds.get(VALID_WRITEIDS_KEY)
+            );
+        } else {
+            throw new RuntimeException("Miss ValidWriteIdList");
+        }
+
+        String partitionPath = partition.getPath();
+        //hdfs://xxxxx/user/hive/warehouse/username/data_id=200103
+
+        List<RemoteFile> lsPartitionPath = new ArrayList<>();
+        Status status = fileSystem.globList(partitionPath + "/*", 
lsPartitionPath);
+        // List all files and folders, without recursion.
+        // FileStatus[] lsPartitionPath = null;
+        // fileSystem.listFileStatuses(partitionPath,lsPartitionPath);
+        if (status != Status.OK) {
+            throw new IOException(status.toString());
+        }
+
+        String oldestBase = null;
+        long oldestBaseWriteId = Long.MAX_VALUE;
+        String bestBasePath = null;
+        long bestBaseWriteId = 0;
+        boolean haveOriginalFiles = false;
+        List<ParsedDelta> workingDeltas = new ArrayList<>();
+
+        for (RemoteFile remotePath : lsPartitionPath) {
+            if (remotePath.isDirectory()) {
+                String dirName = remotePath.getName(); //dirName: 
base_xxx,delta_xxx,...
+                String dirPath = partitionPath + "/" + dirName;
+
+                if (dirName.startsWith("base_")) {
+                    ParsedBase base = parseBase(dirName);
+                    if (!validTxnList.isTxnValid(base.visibilityId)) {
+                        //checks visibilityTxnId to see if it is committed in 
current snapshot.
+                        continue;
+                    }
+
+                    long writeId = base.writeId;
+                    if (oldestBaseWriteId > writeId) {
+                        oldestBase = dirPath;
+                        oldestBaseWriteId = writeId;
+                    }
+
+                    if (((bestBasePath == null) || (bestBaseWriteId < writeId))
+                            && isValidBase(fileSystem, dirPath, base, 
validWriteIdList)) {
+                        //IOW will generator a base_N/ directory: 
https://issues.apache.org/jira/browse/HIVE-14988
+                        //So maybe need consider: 
https://issues.apache.org/jira/browse/HIVE-25777
+
+                        bestBasePath =  dirPath;
+                        bestBaseWriteId = writeId;
+                    }
+                } else if (dirName.startsWith("delta_") || 
dirName.startsWith("delete_delta_")) {
+                    String deltaPrefix = dirName.startsWith("delta_") ? 
"delta_" : "delete_delta_";
+                    ParsedDelta delta = parseDelta(dirName, deltaPrefix, 
dirPath);
+
+                    if (!validTxnList.isTxnValid(delta.visibilityId)) {
+                        continue;
+                    }
+
+                    // No need check 
(validWriteIdList.isWriteIdRangeAborted(min,max) != RangeResponse.ALL)
+                    // It is a subset of 
(validWriteIdList.isWriteIdRangeValid(min, max) != RangeResponse.NONE)
+                    if (validWriteIdList.isWriteIdRangeValid(delta.min, 
delta.max) != RangeResponse.NONE) {
+                        workingDeltas.add(delta);
+                    }
+                } else {
+                    //Sometimes hive will generate temporary 
directories(`.hive-staging_hive_xxx` ),
+                    // which do not need to be read.
+                    LOG.warn("Read Hive Acid Table ignore the contents of this 
folder:" + dirName);
+                }
+            } else {
+                haveOriginalFiles = true;
+            }
+        }
+
+        if (bestBasePath == null && haveOriginalFiles) {
+            // ALTER TABLE nonAcidTbl SET TBLPROPERTIES 
('transactional'='true');
+            throw new UnsupportedOperationException("For no acid table convert 
to acid, please COMPACT 'major'.");
+        }
+
+        if ((oldestBase != null) && (bestBasePath == null)) {
+            /*
+             * If here, it means there was a base_x (> 1 perhaps) but none 
were suitable for given
+             * {@link writeIdList}.  Note that 'original' files are logically 
a base_Long.MIN_VALUE and thus
+             * cannot have any data for an open txn.  We could check {@link 
deltas} has files to cover
+             * [1,n] w/o gaps but this would almost never happen...
+             *
+             * We only throw for base_x produced by Compactor since that base 
erases all history and
+             * cannot be used for a client that has a snapshot in which 
something inside this base is
+             * open.  (Nor can we ignore this base of course)  But base_x 
which is a result of IOW,
+             * contains all history so we treat it just like delta wrt 
visibility.  Imagine, IOW which
+             * aborts. It creates a base_x, which can and should just be 
ignored.*/
+            long[] exceptions = validWriteIdList.getInvalidWriteIds();
+            String minOpenWriteId = ((exceptions != null)
+                    && (exceptions.length > 0)) ? 
String.valueOf(exceptions[0]) : "x";
+            throw new IOException(
+                    String.format("Not enough history available for ({},{}). 
Oldest available base: {}",
+                            validWriteIdList.getHighWatermark(), 
minOpenWriteId, oldestBase));
+        }
+
+        workingDeltas.sort(null);
+
+        List<ParsedDelta> deltas = new ArrayList<>();
+        long current = bestBaseWriteId;
+        int lastStatementId = -1;
+        ParsedDelta prev = null;
+        // find need read delta/delete_delta file.
+        for (ParsedDelta next : workingDeltas) {
+            if (next.max > current) {
+                if (validWriteIdList.isWriteIdRangeValid(current + 1, 
next.max) != RangeResponse.NONE) {
+                    deltas.add(next);
+                    current = next.max;
+                    lastStatementId = next.statementId;
+                    prev = next;
+                }
+            } else if ((next.max == current) && (lastStatementId >= 0)) {
+                //make sure to get all deltas within a single transaction;  
multi-statement txn
+                //generate multiple delta files with the same txnId range
+                //of course, if maxWriteId has already been minor compacted,
+                // all per statement deltas are obsolete
+
+                deltas.add(next);
+                prev = next;
+            } else if ((prev != null)
+                    && (next.max == prev.max)
+                    && (next.min == prev.min)
+                    && (next.statementId == prev.statementId)) {
+                // The 'next' parsedDelta may have everything equal to the 
'prev' parsedDelta, except
+                // the path. This may happen when we have split update and we 
have two types of delta
+                // directories- 'delta_x_y' and 'delete_delta_x_y' for the 
SAME txn range.
+
+                // Also note that any delete_deltas in between a given 
delta_x_y range would be made
+                // obsolete. For example, a delta_30_50 would make 
delete_delta_40_40 obsolete.
+                // This is valid because minor compaction always compacts the 
normal deltas and the delete
+                // deltas for the same range. That is, if we had 3 
directories, delta_30_30,
+                // delete_delta_40_40 and delta_50_50, then running minor 
compaction would produce
+                // delta_30_50 and delete_delta_30_50.
+                deltas.add(next);
+                prev = next;
+            }
+        }
+
+        FileCacheValue fileCacheValue = new FileCacheValue();
+        List<DeleteDeltaInfo> deleteDeltas = new ArrayList<>();
+
+        FileFilter fileFilter = isFullAcid ? new FullAcidFileFilter() : new 
InsertOnlyFileFilter();
+
+        // delta directories
+        for (ParsedDelta delta : deltas) {
+            String location = delta.getPath();
+
+            List<RemoteFile> remoteFiles = new ArrayList<>();
+            status = fileSystem.listFiles(location, false, remoteFiles);
+            if (status.ok()) {
+                if (delta.isDeleteDelta()) {
+                    List<String> deleteDeltaFileNames = remoteFiles.stream()
+                            
.map(RemoteFile::getName).filter(fileFilter::accept)
+                            .collect(Collectors.toList());
+                    deleteDeltas.add(new DeleteDeltaInfo(location, 
deleteDeltaFileNames));
+                    continue;
+                }
+                remoteFiles.stream().filter(f -> 
fileFilter.accept(f.getName())).forEach(file -> {
+                    LocationPath path = new 
LocationPath(file.getPath().toString(), catalogProps);
+                    fileCacheValue.addFile(file, path);
+                });
+            } else {
+                throw new RuntimeException(status.getErrMsg());
+            }
+        }
+
+        // base
+        if (bestBasePath != null) {
+            List<RemoteFile> remoteFiles = new ArrayList<>();
+            status = fileSystem.listFiles(bestBasePath, false, remoteFiles);
+            if (status.ok()) {
+                remoteFiles.stream().filter(f -> 
fileFilter.accept(f.getName()))
+                        .forEach(file -> {
+                            LocationPath path = new 
LocationPath(file.getPath().toString(), catalogProps);
+                            fileCacheValue.addFile(file, path);
+                        });
+            } else {
+                throw new RuntimeException(status.getErrMsg());
+            }
+        }
+
+        if (isFullAcid) {
+            fileCacheValue.setAcidInfo(new AcidInfo(partition.getPath(), 
deleteDeltas));
+        } else if (!deleteDeltas.isEmpty()) {
+            throw new RuntimeException("No Hive Full Acid Table have 
delete_delta_* Dir.");
+        }
+        return fileCacheValue;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java
index a5e0eefb348..fcaefecf41b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java
@@ -23,7 +23,6 @@ import org.apache.doris.datasource.DatabaseMetadata;
 import org.apache.doris.datasource.TableMetadata;
 import 
org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;
 
-import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
@@ -82,7 +81,7 @@ public interface HMSCachedClient {
 
     void commitTxn(long txnId);
 
-    ValidWriteIdList getValidWriteIds(String fullTableName, long 
currentTransactionId);
+    Map<String, String> getValidWriteIds(String fullTableName, long 
currentTransactionId);
 
     void acquireSharedLock(String queryId, long txnId, String user, TableName 
tblName,
             List<String> partitionNames, long timeoutMs);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
index 43eb9e2f573..b056af3ebd6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
@@ -30,6 +30,7 @@ import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
 import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.SchemaCacheValue;
@@ -360,19 +361,24 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
     }
 
     public boolean isHiveTransactionalTable() {
-        return dlaType == DLAType.HIVE && 
AcidUtils.isTransactionalTable(remoteTable)
-                && isSupportedTransactionalFileFormat();
+        return dlaType == DLAType.HIVE && 
AcidUtils.isTransactionalTable(remoteTable);
     }
 
-    private boolean isSupportedTransactionalFileFormat() {
+    private boolean isSupportedFullAcidTransactionalFileFormat() {
         // Sometimes we meet "transactional" = "true" but format is parquet, 
which is not supported.
         // So we need to check the input format for transactional table.
         String inputFormatName = remoteTable.getSd().getInputFormat();
         return inputFormatName != null && 
SUPPORTED_HIVE_TRANSACTIONAL_FILE_FORMATS.contains(inputFormatName);
     }
 
-    public boolean isFullAcidTable() {
-        return dlaType == DLAType.HIVE && 
AcidUtils.isFullAcidTable(remoteTable);
+    public boolean isFullAcidTable() throws UserException {
+        if (dlaType == DLAType.HIVE && AcidUtils.isFullAcidTable(remoteTable)) 
{
+            if (!isSupportedFullAcidTransactionalFileFormat()) {
+                throw new UserException("This table is full Acid Table, but no 
Orc Format.");
+            }
+            return true;
+        }
+        return false;
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index 20d50dd875c..9bf63df6970 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -18,7 +18,6 @@
 package org.apache.doris.datasource.hive;
 
 import org.apache.doris.analysis.PartitionValue;
-import org.apache.doris.backup.Status;
 import org.apache.doris.backup.Status.ErrCode;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
@@ -33,12 +32,12 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.security.authentication.AuthenticationConfig;
+import org.apache.doris.common.security.authentication.HadoopAuthenticator;
 import org.apache.doris.common.util.CacheBulkLoader;
 import org.apache.doris.common.util.LocationPath;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.datasource.CacheException;
 import org.apache.doris.datasource.ExternalMetaCacheMgr;
-import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo;
 import org.apache.doris.datasource.property.PropertyConverter;
 import org.apache.doris.fs.DirectoryLister;
 import org.apache.doris.fs.FileSystemCache;
@@ -60,7 +59,6 @@ import com.github.benmanes.caffeine.cache.CacheLoader;
 import com.github.benmanes.caffeine.cache.LoadingCache;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
 import com.google.common.collect.Iterables;
@@ -75,21 +73,16 @@ import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.utils.FileUtils;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.net.URI;
-import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -109,10 +102,6 @@ import java.util.stream.Collectors;
 public class HiveMetaStoreCache {
     private static final Logger LOG = 
LogManager.getLogger(HiveMetaStoreCache.class);
     public static final String HIVE_DEFAULT_PARTITION = 
"__HIVE_DEFAULT_PARTITION__";
-    // After hive 3, transactional table's will have file '_orc_acid_version' 
with value >= '2'.
-    public static final String HIVE_ORC_ACID_VERSION_FILE = 
"_orc_acid_version";
-
-    private static final String HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX = 
"bucket_";
 
     private final HMSExternalCatalog catalog;
     private JobConf jobConf;
@@ -757,121 +746,32 @@ public class HiveMetaStoreCache {
         return partitionCache;
     }
 
-    public List<FileCacheValue> getFilesByTransaction(List<HivePartition> 
partitions, ValidWriteIdList validWriteIds,
-            boolean isFullAcid, boolean skipCheckingAcidVersionFile, long 
tableId, String bindBrokerName) {
+    public List<FileCacheValue> getFilesByTransaction(List<HivePartition> 
partitions, Map<String, String> txnValidIds,
+            boolean isFullAcid, String bindBrokerName) {
         List<FileCacheValue> fileCacheValues = Lists.newArrayList();
-        String remoteUser = jobConf.get(AuthenticationConfig.HADOOP_USER_NAME);
         try {
-            for (HivePartition partition : partitions) {
-                FileCacheValue fileCacheValue = new FileCacheValue();
-                AcidUtils.Directory directory;
-                if (!Strings.isNullOrEmpty(remoteUser)) {
-                    UserGroupInformation ugi = 
UserGroupInformation.createRemoteUser(remoteUser);
-                    directory = 
ugi.doAs((PrivilegedExceptionAction<AcidUtils.Directory>) () -> 
AcidUtils.getAcidState(
-                            new Path(partition.getPath()), jobConf, 
validWriteIds, false, true));
-                } else {
-                    directory = AcidUtils.getAcidState(new 
Path(partition.getPath()), jobConf, validWriteIds, false,
-                            true);
-                }
-                if (directory == null) {
-                    return Collections.emptyList();
-                }
-                if (!directory.getOriginalFiles().isEmpty()) {
-                    throw new Exception("Original non-ACID files in 
transactional tables are not supported");
-                }
-
-                if (isFullAcid) {
-                    int acidVersion = 2;
-                    /**
-                     * From Hive version >= 3.0, delta/base files will always 
have file '_orc_acid_version'
-                     * with value >= '2'.
-                     */
-                    Path baseOrDeltaPath = directory.getBaseDirectory() != 
null ? directory.getBaseDirectory() :
-                            !directory.getCurrentDirectories().isEmpty() ? 
directory.getCurrentDirectories().get(0)
-                                    .getPath() : null;
-                    if (baseOrDeltaPath == null) {
-                        return Collections.emptyList();
-                    }
-                    if (!skipCheckingAcidVersionFile) {
-                        String acidVersionPath = new Path(baseOrDeltaPath, 
"_orc_acid_version").toUri().toString();
-                        RemoteFileSystem fs = 
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
-                                new FileSystemCache.FileSystemCacheKey(
-                                        
LocationPath.getFSIdentity(baseOrDeltaPath.toUri().toString(),
-                                                bindBrokerName),
-                                        
catalog.getCatalogProperty().getProperties(),
-                                        bindBrokerName, jobConf));
-                        Status status = fs.exists(acidVersionPath);
-                        if (status != Status.OK) {
-                            if (status.getErrCode() == ErrCode.NOT_FOUND) {
-                                acidVersion = 0;
-                            } else {
-                                throw new Exception(String.format("Failed to 
check remote path {} exists.",
-                                        acidVersionPath));
-                            }
-                        }
-                        if (acidVersion == 0 && 
!directory.getCurrentDirectories().isEmpty()) {
-                            throw new Exception(
-                                    "Hive 2.x versioned full-acid tables need 
to run major compaction.");
-                        }
-                    }
-                }
-
-                // delta directories
-                List<DeleteDeltaInfo> deleteDeltas = new ArrayList<>();
-                for (AcidUtils.ParsedDelta delta : 
directory.getCurrentDirectories()) {
-                    String location = delta.getPath().toString();
-                    RemoteFileSystem fs = 
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
-                            new FileSystemCache.FileSystemCacheKey(
-                                    LocationPath.getFSIdentity(location, 
bindBrokerName),
-                                            
catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf));
-                    List<RemoteFile> remoteFiles = new ArrayList<>();
-                    Status status = fs.listFiles(location, false, remoteFiles);
-                    if (status.ok()) {
-                        if (delta.isDeleteDelta()) {
-                            List<String> deleteDeltaFileNames = 
remoteFiles.stream().map(f -> f.getName()).filter(
-                                            name -> 
name.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
-                                    .collect(Collectors.toList());
-                            deleteDeltas.add(new DeleteDeltaInfo(location, 
deleteDeltaFileNames));
-                            continue;
-                        }
-                        remoteFiles.stream().filter(
-                                f -> 
f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)).forEach(file -> {
-                                    LocationPath path = new 
LocationPath(file.getPath().toString(),
-                                            catalog.getProperties());
-                                    fileCacheValue.addFile(file, path);
-                                });
-                    } else {
-                        throw new RuntimeException(status.getErrMsg());
-                    }
-                }
+            if (partitions.isEmpty()) {
+                return fileCacheValues;
+            }
 
-                // base
-                if (directory.getBaseDirectory() != null) {
-                    String location = directory.getBaseDirectory().toString();
-                    RemoteFileSystem fs = 
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
-                            new FileSystemCache.FileSystemCacheKey(
-                                    LocationPath.getFSIdentity(location, 
bindBrokerName),
-                                            
catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf));
-                    List<RemoteFile> remoteFiles = new ArrayList<>();
-                    Status status = fs.listFiles(location, false, remoteFiles);
-                    if (status.ok()) {
-                        remoteFiles.stream().filter(
-                                        f -> 
f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
-                                .forEach(file -> {
-                                    LocationPath path = new 
LocationPath(file.getPath().toString(),
-                                            catalog.getProperties());
-                                    fileCacheValue.addFile(file, path);
-                                });
-                    } else {
-                        throw new RuntimeException(status.getErrMsg());
-                    }
-                }
-                fileCacheValue.setAcidInfo(new AcidInfo(partition.getPath(), 
deleteDeltas));
-                fileCacheValues.add(fileCacheValue);
+            for (HivePartition partition : partitions) {
+                //Get filesystem multiple times, Reason: 
https://github.com/apache/doris/pull/23409.
+                RemoteFileSystem fileSystem = 
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
+                        new FileSystemCache.FileSystemCacheKey(
+                                
LocationPath.getFSIdentity(partition.getPath(), bindBrokerName),
+                                catalog.getCatalogProperty().getProperties(), 
bindBrokerName, jobConf));
+
+                AuthenticationConfig authenticationConfig = 
AuthenticationConfig.getKerberosConfig(jobConf);
+                HadoopAuthenticator hadoopAuthenticator =
+                        
HadoopAuthenticator.getHadoopAuthenticator(authenticationConfig);
+
+                fileCacheValues.add(
+                        hadoopAuthenticator.doAs(() -> AcidUtil.getAcidState(
+                        fileSystem, partition, txnValidIds, 
catalog.getProperties(), isFullAcid))
+                );
             }
         } catch (Exception e) {
-            throw new CacheException("failed to get input splits for write ids 
%s in catalog %s", e,
-                    validWriteIds.toString(), catalog.getName());
+            throw new CacheException("Failed to get input splits %s", e, 
txnValidIds.toString());
         }
         return fileCacheValues;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
index 814b26246f9..d48bdfb5e3d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
@@ -46,6 +46,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -179,6 +180,25 @@ public class HiveMetadataOps implements 
ExternalMetadataOps {
                     props.put("owner", 
ConnectContext.get().getUserIdentity().getQualifiedUser());
                 }
             }
+
+            if (props.containsKey("transactional") && 
props.get("transactional").equalsIgnoreCase("true")) {
+                throw new UserException("Not support create hive transactional 
table.");
+                /*
+                    CREATE TABLE trans6(
+                      `col1` int,
+                      `col2` int
+                    )  ENGINE=hive
+                    PROPERTIES (
+                      'file_format'='orc',
+                      'compression'='zlib',
+                      'bucketing_version'='2',
+                      'transactional'='true',
+                      'transactional_properties'='default'
+                    );
+                    In hive, this table only can insert not update(not report 
error,but not actually updated).
+                 */
+            }
+
             String fileFormat = props.getOrDefault(FILE_FORMAT_KEY, 
Config.hive_default_file_format);
             Map<String, String> ddlProps = new HashMap<>();
             for (Map.Entry<String, String> entry : props.entrySet()) {
@@ -273,6 +293,10 @@ public class HiveMetadataOps implements 
ExternalMetadataOps {
                 ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_TABLE, 
tblName, dbName);
             }
         }
+        if (AcidUtils.isTransactionalTable(client.getTable(dbName, tblName))) {
+            throw new DdlException("Not support drop hive transactional 
table.");
+        }
+
         try {
             client.dropTable(dbName, tblName);
             db.setUnInitialized(true);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTransaction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTransaction.java
index a88b1136369..7d62af29611 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTransaction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTransaction.java
@@ -21,9 +21,9 @@ import org.apache.doris.analysis.TableName;
 import org.apache.doris.common.UserException;
 
 import com.google.common.collect.Lists;
-import org.apache.hadoop.hive.common.ValidWriteIdList;
 
 import java.util.List;
+import java.util.Map;
 
 /**
  * HiveTransaction is used to save info of a hive transaction.
@@ -40,7 +40,7 @@ public class HiveTransaction {
     private long txnId;
     private List<String> partitionNames = Lists.newArrayList();
 
-    ValidWriteIdList validWriteIdList = null;
+    Map<String, String> txnValidIds = null;
 
     public HiveTransaction(String queryId, String user, HMSExternalTable 
hiveTable, boolean isFullAcid) {
         this.queryId = queryId;
@@ -61,14 +61,14 @@ public class HiveTransaction {
         return isFullAcid;
     }
 
-    public ValidWriteIdList getValidWriteIds(HMSCachedClient client) {
-        if (validWriteIdList == null) {
+    public Map<String, String> getValidWriteIds(HMSCachedClient client) {
+        if (txnValidIds == null) {
             TableName tableName = new 
TableName(hiveTable.getCatalog().getName(), hiveTable.getDbName(),
                     hiveTable.getName());
             client.acquireSharedLock(queryId, txnId, user, tableName, 
partitionNames, 5000);
-            validWriteIdList = client.getValidWriteIds(tableName.getDb() + "." 
+ tableName.getTbl(), txnId);
+            txnValidIds = client.getValidWriteIds(tableName.getDb() + "." + 
tableName.getTbl(), txnId);
         }
-        return validWriteIdList;
+        return txnValidIds;
     }
 
     public void begin() throws UserException {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java
index 0cdb3e469c2..672afa96208 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java
@@ -32,7 +32,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableList.Builder;
 import com.google.common.collect.ImmutableMap;
 import org.apache.commons.lang3.NotImplementedException;
-import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
@@ -523,7 +522,7 @@ public class PostgreSQLJdbcHMSCachedClient extends 
JdbcHMSCachedClient {
     }
 
     @Override
-    public ValidWriteIdList getValidWriteIds(String fullTableName, long 
currentTransactionId) {
+    public Map<String, String> getValidWriteIds(String fullTableName, long 
currentTransactionId) {
         throw new HMSClientException("Do not support in 
PostgreSQLJdbcHMSCachedClient.");
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
index 56b69dc71e2..9137c977f17 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
@@ -31,6 +31,7 @@ import 
com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
@@ -564,7 +565,8 @@ public class ThriftHMSCachedClient implements 
HMSCachedClient {
     }
 
     @Override
-    public ValidWriteIdList getValidWriteIds(String fullTableName, long 
currentTransactionId) {
+    public Map<String, String> getValidWriteIds(String fullTableName, long 
currentTransactionId) {
+        Map<String, String> conf = new HashMap<>();
         try (ThriftHMSClient client = getClient()) {
             try {
                 return ugiDoAs(() -> {
@@ -581,7 +583,10 @@ public class ThriftHMSCachedClient implements 
HMSCachedClient {
                     ValidTxnWriteIdList validTxnWriteIdList = 
TxnUtils.createValidTxnWriteIdList(currentTransactionId,
                             tableValidWriteIdsList);
                     ValidWriteIdList writeIdList = 
validTxnWriteIdList.getTableValidWriteIdList(fullTableName);
-                    return writeIdList;
+
+                    conf.put(AcidUtil.VALID_TXNS_KEY, 
validTransactions.writeToString());
+                    conf.put(AcidUtil.VALID_WRITEIDS_KEY, 
writeIdList.writeToString());
+                    return conf;
                 });
             } catch (Exception e) {
                 client.setThrowable(e);
@@ -591,7 +596,14 @@ public class ThriftHMSCachedClient implements 
HMSCachedClient {
             // Ignore this exception when the version of hive is not 
compatible with these apis.
             // Currently, the workaround is using a max watermark.
             LOG.warn("failed to get valid write ids for {}, transaction {}", 
fullTableName, currentTransactionId, e);
-            return new ValidReaderWriteIdList(fullTableName, new long[0], new 
BitSet(), Long.MAX_VALUE);
+
+            ValidTxnList validTransactions = new ValidReadTxnList(
+                    new long[0], new BitSet(), Long.MAX_VALUE, Long.MAX_VALUE);
+            ValidWriteIdList writeIdList = new ValidReaderWriteIdList(
+                    fullTableName, new long[0], new BitSet(), Long.MAX_VALUE);
+            conf.put(AcidUtil.VALID_TXNS_KEY, 
validTransactions.writeToString());
+            conf.put(AcidUtil.VALID_WRITEIDS_KEY, writeIdList.writeToString());
+            return conf;
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
index 2c227ffd1e3..d66a4c08e9d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
@@ -59,7 +59,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import lombok.Setter;
-import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.logging.log4j.LogManager;
@@ -273,7 +272,14 @@ public class HiveScanNode extends FileQueryScanNode {
             List<Split> allFiles, String bindBrokerName, int numBackends) 
throws IOException, UserException {
         List<FileCacheValue> fileCaches;
         if (hiveTransaction != null) {
-            fileCaches = getFileSplitByTransaction(cache, partitions, 
bindBrokerName);
+            try {
+                fileCaches = getFileSplitByTransaction(cache, partitions, 
bindBrokerName);
+            } catch (Exception e) {
+                // Release shared load (getValidWriteIds acquire Lock).
+                // If no exception is throw, the lock will be released when 
`finalizeQuery()`.
+                
Env.getCurrentHiveTransactionMgr().deregister(hiveTransaction.getQueryId());
+                throw e;
+            }
         } else {
             boolean withCache = Config.max_external_file_cache_num > 0;
             fileCaches = cache.getFilesByPartitions(partitions, withCache, 
partitions.size() > 1, bindBrokerName,
@@ -374,10 +380,10 @@ public class HiveScanNode extends FileQueryScanNode {
             }
             
hiveTransaction.addPartition(partition.getPartitionName(hmsTable.getPartitionColumns()));
         }
-        ValidWriteIdList validWriteIds = hiveTransaction.getValidWriteIds(
+        Map<String, String> txnValidIds = hiveTransaction.getValidWriteIds(
                 ((HMSExternalCatalog) hmsTable.getCatalog()).getClient());
-        return cache.getFilesByTransaction(partitions, validWriteIds,
-            hiveTransaction.isFullAcid(), skipCheckingAcidVersionFile, 
hmsTable.getId(), bindBrokerName);
+
+        return cache.getFilesByTransaction(partitions, txnValidIds, 
hiveTransaction.isFullAcid(), bindBrokerName);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index 63a1712c242..82de92251de 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -275,6 +275,9 @@ public class InsertIntoTableCommand extends Command 
implements ForwardWithSync,
         } else if (physicalSink instanceof PhysicalHiveTableSink) {
             boolean emptyInsert = childIsEmptyRelation(physicalSink);
             HMSExternalTable hiveExternalTable = (HMSExternalTable) 
targetTableIf;
+            if (hiveExternalTable.isHiveTransactionalTable()) {
+                throw new AnalysisException("Not supported insert into hive 
transactional table.");
+            }
             insertExecutor = new HiveInsertExecutor(ctx, hiveExternalTable, 
label, planner,
                     Optional.of(insertCtx.orElse((new 
HiveInsertCommandContext()))), emptyInsert);
             // set hive query options
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/TestHMSCachedClient.java 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/TestHMSCachedClient.java
index 12e66398210..5d4474d77ec 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/TestHMSCachedClient.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/TestHMSCachedClient.java
@@ -28,7 +28,6 @@ import org.apache.doris.datasource.hive.HiveUtil;
 import 
org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;
 
 import com.google.common.collect.ImmutableList;
-import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
@@ -202,7 +201,7 @@ public class TestHMSCachedClient implements HMSCachedClient 
{
     }
 
     @Override
-    public ValidWriteIdList getValidWriteIds(String fullTableName, long 
currentTransactionId) {
+    public Map<String, String> getValidWriteIds(String fullTableName, long 
currentTransactionId) {
         return null;
     }
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveAcidTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveAcidTest.java
new file mode 100644
index 00000000000..a54084e9b45
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveAcidTest.java
@@ -0,0 +1,595 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hive;
+
+import org.apache.doris.common.info.SimpleTableInfo;
+import org.apache.doris.datasource.hive.HiveMetaStoreCache.FileCacheValue;
+import org.apache.doris.fs.LocalDfsFileSystem;
+
+import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+
+public class HiveAcidTest {
+
+    @Test
+    public void testOriginalDeltas() throws Exception {
+        LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem();
+        Path tempPath = Files.createTempDirectory("tbl");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/000000_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/000001_1");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/000002_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/random");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/_done");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/subdir/000000_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_025_025");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_029_029");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_025_030");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_050_100");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_101_101");
+
+        Map<String, String> txnValidIds = new HashMap<>();
+        txnValidIds.put(
+                AcidUtil.VALID_TXNS_KEY,
+                new ValidReadTxnList(new long[0], new BitSet(), 1000, 
Long.MAX_VALUE).writeToString());
+        txnValidIds.put(
+                AcidUtil.VALID_WRITEIDS_KEY,
+            new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + 
":").writeToString());
+
+        HivePartition partition = new HivePartition(new SimpleTableInfo("", 
"tbl"),
+                false, "", "file://" + tempPath.toAbsolutePath() + "",
+                new ArrayList<>(), new HashMap<>());
+        try {
+            AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, 
new HashMap<>(), true);
+        } catch (UnsupportedOperationException e) {
+            Assert.assertTrue(e.getMessage().contains("For no acid table 
convert to acid, please COMPACT 'major'."));
+        }
+    }
+
+
+    @Test
+    public void testObsoleteOriginals() throws Exception {
+        LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem();
+        Path tempPath = Files.createTempDirectory("tbl");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/base_10/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/base_5/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/000000_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/000001_1");
+
+        Map<String, String> txnValidIds = new HashMap<>();
+        txnValidIds.put(
+                AcidUtil.VALID_TXNS_KEY,
+                new ValidReadTxnList(new long[0], new BitSet(), 1000, 
Long.MAX_VALUE).writeToString());
+        txnValidIds.put(
+                AcidUtil.VALID_WRITEIDS_KEY,
+                new ValidReaderWriteIdList("tbl:150:" + Long.MAX_VALUE + 
":").writeToString());
+
+        HivePartition partition = new HivePartition(
+                new SimpleTableInfo("", "tbl"),
+                false,
+                "",
+                "file://" + tempPath.toAbsolutePath() + "",
+                new ArrayList<>(),
+                new HashMap<>());
+        try {
+            AcidUtil.getAcidState(localDFSFileSystem, partition, txnValidIds, 
new HashMap<>(), true);
+        } catch (UnsupportedOperationException e) {
+            Assert.assertTrue(e.getMessage().contains("For no acid table 
convert to acid, please COMPACT 'major'."));
+        }
+    }
+
+
+    @Test
+    public void testOverlapingDelta() throws Exception {
+        LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem();
+        Path tempPath = Files.createTempDirectory("tbl");
+
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_0000063_63/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_000062_62/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_00061_61/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_40_60/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_0060_60/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_052_55/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/base_50/bucket_0");
+
+        Map<String, String> txnValidIds = new HashMap<>();
+        txnValidIds.put(
+                AcidUtil.VALID_TXNS_KEY,
+                new ValidReadTxnList(new long[0], new BitSet(), 1000, 
Long.MAX_VALUE).writeToString());
+        txnValidIds.put(
+                AcidUtil.VALID_WRITEIDS_KEY,
+                new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + 
":").writeToString());
+
+        HivePartition partition = new HivePartition(
+                new SimpleTableInfo("", "tbl"),
+                false,
+                "",
+                "file://" + tempPath.toAbsolutePath() + "",
+                new ArrayList<>(),
+                new HashMap<>());
+
+        FileCacheValue fileCacheValue =
+                AcidUtil.getAcidState(localDFSFileSystem, partition, 
txnValidIds, new HashMap<>(), true);
+
+        List<String> readFile =
+                fileCacheValue.getFiles().stream().map(x -> 
x.path.toString()).collect(Collectors.toList());
+
+
+        List<String> resultReadFile = Arrays.asList(
+                "file:" + tempPath.toAbsolutePath() + "/base_50/bucket_0",
+                "file:" + tempPath.toAbsolutePath() + "/delta_40_60/bucket_0",
+                "file:" + tempPath.toAbsolutePath() + 
"/delta_00061_61/bucket_0",
+                "file:" + tempPath.toAbsolutePath() + 
"/delta_000062_62/bucket_0",
+                "file:" + tempPath.toAbsolutePath() + 
"/delta_0000063_63/bucket_0"
+        );
+        Assert.assertTrue(resultReadFile.containsAll(readFile) && 
readFile.containsAll(resultReadFile));
+    }
+
+
+    @Test
+    public void testOverlapingDelta2() throws Exception {
+        LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem();
+        Path tempPath = Files.createTempDirectory("tbl");
+
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_0000063_63_0/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_000062_62_0/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_000062_62_3/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_00061_61_0/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_40_60/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_0060_60_1/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_0060_60_4/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_0060_60_7/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_052_55/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_058_58/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/base_50/bucket_0");
+
+        Map<String, String> txnValidIds = new HashMap<>();
+        txnValidIds.put(
+                AcidUtil.VALID_TXNS_KEY,
+                new ValidReadTxnList(new long[0], new BitSet(), 1000, 
Long.MAX_VALUE).writeToString());
+        txnValidIds.put(
+                AcidUtil.VALID_WRITEIDS_KEY,
+                new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + 
":").writeToString());
+
+        HivePartition partition = new HivePartition(
+                new SimpleTableInfo("", "tbl"),
+                false,
+                "",
+                "file://" + tempPath.toAbsolutePath() + "",
+                new ArrayList<>(),
+                new HashMap<>());
+
+        FileCacheValue fileCacheValue =
+                AcidUtil.getAcidState(localDFSFileSystem, partition, 
txnValidIds, new HashMap<>(), true);
+
+
+        List<String> readFile =
+                fileCacheValue.getFiles().stream().map(x -> 
x.path.toString()).collect(Collectors.toList());
+
+
+        List<String> resultReadFile = Arrays.asList(
+                "file:" + tempPath.toAbsolutePath() + "/base_50/bucket_0",
+                "file:" + tempPath.toAbsolutePath() + "/delta_40_60/bucket_0",
+                "file:" + tempPath.toAbsolutePath() + 
"/delta_00061_61_0/bucket_0",
+                "file:" + tempPath.toAbsolutePath() + 
"/delta_000062_62_0/bucket_0",
+                "file:" + tempPath.toAbsolutePath() + 
"/delta_000062_62_3/bucket_0",
+                "file:" + tempPath.toAbsolutePath() + 
"/delta_0000063_63_0/bucket_0"
+
+        );
+
+        Assert.assertTrue(resultReadFile.containsAll(readFile) && 
readFile.containsAll(resultReadFile));
+    }
+
+
+    @Test
+    public void deltasWithOpenTxnInRead() throws Exception {
+        LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem();
+        Path tempPath = Files.createTempDirectory("tbl");
+
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_1_1/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_2_5/bucket_0");
+
+        Map<String, String> txnValidIds = new HashMap<>();
+
+        txnValidIds.put(
+                AcidUtil.VALID_TXNS_KEY,
+                new ValidReadTxnList(new long[] {50}, new BitSet(), 1000, 
55).writeToString());
+        txnValidIds.put(
+                AcidUtil.VALID_WRITEIDS_KEY,
+                new ValidReaderWriteIdList("tbl:100:4:4").writeToString());
+
+        HivePartition partition = new HivePartition(
+                new SimpleTableInfo("", "tbl"),
+                false,
+                "",
+                "file://" + tempPath.toAbsolutePath() + "",
+                new ArrayList<>(),
+                new HashMap<>());
+
+
+        FileCacheValue fileCacheValue =
+                AcidUtil.getAcidState(localDFSFileSystem, partition, 
txnValidIds, new HashMap<>(), true);
+
+
+        List<String> readFile =
+                fileCacheValue.getFiles().stream().map(x -> 
x.path.toString()).collect(Collectors.toList());
+
+
+        List<String> resultReadFile = Arrays.asList(
+                "file:" + tempPath.toAbsolutePath() + "/delta_1_1/bucket_0",
+                "file:" + tempPath.toAbsolutePath() + "/delta_2_5/bucket_0"
+        );
+
+        Assert.assertTrue(resultReadFile.containsAll(readFile) && 
readFile.containsAll(resultReadFile));
+
+    }
+
+
+    @Test
+    public void deltasWithOpenTxnInRead2() throws Exception {
+        LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem();
+        Path tempPath = Files.createTempDirectory("tbl");
+
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_1_1/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_2_5/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_4_4_1/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_4_4_3/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_101_101_1/bucket_0");
+
+        Map<String, String> txnValidIds = new HashMap<>();
+
+        txnValidIds.put(
+                AcidUtil.VALID_TXNS_KEY,
+                new ValidReadTxnList(new long[] {50}, new BitSet(), 1000, 
55).writeToString());
+        txnValidIds.put(
+                AcidUtil.VALID_WRITEIDS_KEY,
+                new ValidReaderWriteIdList("tbl:100:4:4").writeToString());
+
+        HivePartition partition = new HivePartition(
+                new SimpleTableInfo("", "tbl"),
+                false,
+                "",
+                "file://" + tempPath.toAbsolutePath() + "",
+                new ArrayList<>(),
+                new HashMap<>());
+
+        FileCacheValue fileCacheValue =
+                AcidUtil.getAcidState(localDFSFileSystem, partition, 
txnValidIds, new HashMap<>(), true);
+
+
+        List<String> readFile =
+                fileCacheValue.getFiles().stream().map(x -> 
x.path.toString()).collect(Collectors.toList());
+
+
+        List<String> resultReadFile = Arrays.asList(
+                "file:" + tempPath.toAbsolutePath() + "/delta_1_1/bucket_0",
+                "file:" + tempPath.toAbsolutePath() + "/delta_2_5/bucket_0"
+        );
+
+        Assert.assertTrue(resultReadFile.containsAll(readFile) && 
readFile.containsAll(resultReadFile));
+    }
+
+    @Test
+    public void testBaseWithDeleteDeltas() throws Exception {
+        LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem();
+        Path tempPath = Files.createTempDirectory("tbl");
+
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/base_5/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/base_10/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/base_49/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_025_025/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_029_029/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delete_delta_029_029/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_025_030/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delete_delta_025_030/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_050_105/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delete_delta_050_105/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delete_delta_110_110/bucket_0");
+
+        Map<String, String> txnValidIds = new HashMap<>();
+        txnValidIds.put(
+                AcidUtil.VALID_TXNS_KEY,
+                new ValidReadTxnList(new long[0], new BitSet(), 1000, 
Long.MAX_VALUE).writeToString());
+        txnValidIds.put(
+                AcidUtil.VALID_WRITEIDS_KEY,
+                new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + 
":").writeToString());
+
+        // Map<String, String> tableProps = new HashMap<>();
+        // 
tableProps.put(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
+        //         
AcidUtils.AcidOperationalProperties.getDefault().toString());
+
+        HivePartition partition = new HivePartition(
+                new SimpleTableInfo("", "tbl"),
+                false,
+                "",
+                "file://" + tempPath.toAbsolutePath() + "",
+                new ArrayList<>(),
+                new HashMap<>());
+
+        FileCacheValue fileCacheValue =
+                AcidUtil.getAcidState(localDFSFileSystem, partition, 
txnValidIds, new HashMap<>(), true);
+
+
+        List<String> readFile =
+                fileCacheValue.getFiles().stream().map(x -> 
x.path.toString()).collect(Collectors.toList());
+
+
+        List<String> resultReadFile = Arrays.asList(
+                "file:" + tempPath.toAbsolutePath() + "/base_49/bucket_0",
+                "file:" + tempPath.toAbsolutePath() + "/delta_050_105/bucket_0"
+        );
+
+        Assert.assertTrue(resultReadFile.containsAll(readFile) && 
readFile.containsAll(resultReadFile));
+
+        List<String> resultDelta = Arrays.asList(
+                "file://" + tempPath.toAbsolutePath() + 
"/delete_delta_050_105/bucket_0"
+        );
+
+        List<String> deltaFiles = new ArrayList<>();
+        fileCacheValue.getAcidInfo().getDeleteDeltas().forEach(
+                deltaInfo -> {
+                    String loc = deltaInfo.getDirectoryLocation();
+                    deltaInfo.getFileNames().forEach(
+                            fileName -> deltaFiles.add(loc + "/" + fileName)
+                    );
+                }
+        );
+
+        Assert.assertTrue(resultDelta.containsAll(deltaFiles) && 
deltaFiles.containsAll(resultDelta));
+    }
+
+
+    @Test
+    public void testOverlapingDeltaAndDeleteDelta() throws Exception {
+        LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem();
+        Path tempPath = Files.createTempDirectory("tbl");
+
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_0000063_63/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_000062_62/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_00061_61/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delete_delta_00064_64/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_40_60/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delete_delta_40_60/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_0060_60/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_052_55/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delete_delta_052_55/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/base_50/bucket_0");
+
+        Map<String, String> txnValidIds = new HashMap<>();
+        txnValidIds.put(
+                AcidUtil.VALID_TXNS_KEY,
+                new ValidReadTxnList(new long[0], new BitSet(), 1000, 
Long.MAX_VALUE).writeToString());
+        txnValidIds.put(
+                AcidUtil.VALID_WRITEIDS_KEY,
+                new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + 
":").writeToString());
+
+
+        Map<String, String> tableProps = new HashMap<>();
+        // 
tableProps.put(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
+        //         
AcidUtils.AcidOperationalProperties.getDefault().toString());
+
+        HivePartition partition = new HivePartition(
+                new SimpleTableInfo("", "tbl"),
+                false,
+                "",
+                "file://" + tempPath.toAbsolutePath() + "",
+                new ArrayList<>(),
+                tableProps);
+
+        FileCacheValue fileCacheValue =
+                AcidUtil.getAcidState(localDFSFileSystem, partition, 
txnValidIds, new HashMap<>(), true);
+
+
+
+        List<String> readFile =
+                fileCacheValue.getFiles().stream().map(x -> 
x.path.toString()).collect(Collectors.toList());
+
+
+        List<String> resultReadFile = Arrays.asList(
+                "file:" + tempPath.toAbsolutePath() + "/base_50/bucket_0",
+
+                "file:" + tempPath.toAbsolutePath() + "/delta_40_60/bucket_0",
+                "file:" + tempPath.toAbsolutePath() + 
"/delta_00061_61/bucket_0",
+                "file:" + tempPath.toAbsolutePath() + 
"/delta_000062_62/bucket_0",
+                "file:" + tempPath.toAbsolutePath() + 
"/delta_0000063_63/bucket_0"
+
+        );
+
+        Assert.assertTrue(resultReadFile.containsAll(readFile) && 
readFile.containsAll(resultReadFile));
+
+        List<String> resultDelta = Arrays.asList(
+
+                "file://" + tempPath.toAbsolutePath() + 
"/delete_delta_40_60/bucket_0",
+                "file://" + tempPath.toAbsolutePath() + 
"/delete_delta_00064_64/bucket_0"
+        );
+
+        List<String> deltaFiles = new ArrayList<>();
+        fileCacheValue.getAcidInfo().getDeleteDeltas().forEach(
+                deltaInfo -> {
+                    String loc = deltaInfo.getDirectoryLocation();
+                    deltaInfo.getFileNames().forEach(
+                            fileName -> deltaFiles.add(loc + "/" + fileName)
+                    );
+                }
+        );
+
+        Assert.assertTrue(resultDelta.containsAll(deltaFiles) && 
deltaFiles.containsAll(resultDelta));
+    }
+
+    @Test
+    public void testMinorCompactedDeltaMakesInBetweenDelteDeltaObsolete() 
throws Exception {
+        LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem();
+        Path tempPath = Files.createTempDirectory("tbl");
+
+
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_40_60/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delete_delta_50_50/bucket_0");
+
+        Map<String, String> txnValidIds = new HashMap<>();
+        txnValidIds.put(
+                AcidUtil.VALID_TXNS_KEY,
+                new ValidReadTxnList(new long[0], new BitSet(), 1000, 
Long.MAX_VALUE).writeToString());
+        txnValidIds.put(
+                AcidUtil.VALID_WRITEIDS_KEY,
+                new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + 
":").writeToString());
+
+        Map<String, String> tableProps = new HashMap<>();
+        HivePartition partition = new HivePartition(
+                new SimpleTableInfo("", "tbl"),
+                false,
+                "",
+                "file://" + tempPath.toAbsolutePath() + "",
+                new ArrayList<>(),
+                tableProps);
+
+        FileCacheValue fileCacheValue =
+                AcidUtil.getAcidState(localDFSFileSystem, partition, 
txnValidIds, new HashMap<>(), true);
+
+        List<String> readFile =
+                fileCacheValue.getFiles().stream().map(x -> 
x.path.toString()).collect(Collectors.toList());
+
+
+        List<String> resultReadFile = Arrays.asList(
+                "file:" + tempPath.toAbsolutePath() + "/delta_40_60/bucket_0"
+        );
+
+        Assert.assertTrue(resultReadFile.containsAll(readFile) && 
readFile.containsAll(resultReadFile));
+    }
+
+    @Test
+    public void deleteDeltasWithOpenTxnInRead() throws Exception {
+        LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem();
+        Path tempPath = Files.createTempDirectory("tbl");
+
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_1_1/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_2_5/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delete_delta_2_5/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delete_delta_3_3/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_4_4_1/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_4_4_3/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_101_101_1/bucket_0");
+
+
+        Map<String, String> txnValidIds = new HashMap<>();
+        txnValidIds.put(
+                AcidUtil.VALID_TXNS_KEY,
+                new ValidReadTxnList(new long[] {50}, new BitSet(), 1000, 
55).writeToString());
+        txnValidIds.put(
+                AcidUtil.VALID_WRITEIDS_KEY,
+                new ValidReaderWriteIdList("tbl:100:4:4").writeToString());
+
+
+        Map<String, String> tableProps = new HashMap<>();
+        HivePartition partition = new HivePartition(
+                new SimpleTableInfo("", "tbl"),
+                false,
+                "",
+                "file://" + tempPath.toAbsolutePath() + "",
+                new ArrayList<>(),
+                tableProps);
+
+        FileCacheValue fileCacheValue =
+                AcidUtil.getAcidState(localDFSFileSystem, partition, 
txnValidIds, new HashMap<>(), true);
+
+
+        List<String> readFile =
+                fileCacheValue.getFiles().stream().map(x -> 
x.path.toString()).collect(Collectors.toList());
+
+
+        List<String> resultReadFile = Arrays.asList(
+                "file:" + tempPath.toAbsolutePath() + "/delta_1_1/bucket_0",
+                "file:" + tempPath.toAbsolutePath() + "/delta_2_5/bucket_0"
+        );
+
+        Assert.assertTrue(resultReadFile.containsAll(readFile) && 
readFile.containsAll(resultReadFile));
+        List<String> resultDelta = Arrays.asList(
+                "file://" + tempPath.toAbsolutePath() + 
"/delete_delta_2_5/bucket_0"
+        );
+
+        List<String> deltaFiles = new ArrayList<>();
+        fileCacheValue.getAcidInfo().getDeleteDeltas().forEach(
+                deltaInfo -> {
+                    String loc = deltaInfo.getDirectoryLocation();
+                    deltaInfo.getFileNames().forEach(
+                            fileName -> deltaFiles.add(loc + "/" + fileName)
+                    );
+                }
+        );
+
+        Assert.assertTrue(resultDelta.containsAll(deltaFiles) && 
deltaFiles.containsAll(resultDelta));
+    }
+
+    @Test
+    public void testBaseDeltas() throws Exception {
+        LocalDfsFileSystem localDFSFileSystem = new LocalDfsFileSystem();
+        Path tempPath = Files.createTempDirectory("tbl");
+
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/base_5/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/base_10/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/base_49/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_025_025/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_029_029/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_025_030/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_050_105/bucket_0");
+        localDFSFileSystem.createFile("file://" + tempPath.toAbsolutePath() + 
"/delta_90_120/bucket_0");
+
+        Map<String, String> txnValidIds = new HashMap<>();
+        txnValidIds.put(
+                AcidUtil.VALID_TXNS_KEY,
+                new ValidReadTxnList(new long[0], new BitSet(), 1000, 
Long.MAX_VALUE).writeToString());
+        txnValidIds.put(
+                AcidUtil.VALID_WRITEIDS_KEY,
+                new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + 
":").writeToString());
+
+        Map<String, String> tableProps = new HashMap<>();
+
+        HivePartition partition = new HivePartition(
+                new SimpleTableInfo("", "tbl"),
+                false,
+                "",
+                "file://" + tempPath.toAbsolutePath() + "",
+                new ArrayList<>(),
+                tableProps);
+
+        FileCacheValue fileCacheValue =
+                AcidUtil.getAcidState(localDFSFileSystem, partition, 
txnValidIds, new HashMap<>(), true);
+
+        List<String> readFile =
+                fileCacheValue.getFiles().stream().map(x -> 
x.path.toString()).collect(Collectors.toList());
+
+
+
+        List<String> resultReadFile = Arrays.asList(
+                "file:" + tempPath.toAbsolutePath() + "/base_49/bucket_0",
+                "file:" + tempPath.toAbsolutePath() + "/delta_050_105/bucket_0"
+        );
+        Assert.assertTrue(resultReadFile.containsAll(readFile) && 
readFile.containsAll(resultReadFile));
+    }
+}
diff --git 
a/regression-test/data/external_table_p0/hive/test_transactional_hive.out 
b/regression-test/data/external_table_p0/hive/test_transactional_hive.out
index 2ad0446ccb7..060fa8c048e 100644
Binary files 
a/regression-test/data/external_table_p0/hive/test_transactional_hive.out and 
b/regression-test/data/external_table_p0/hive/test_transactional_hive.out differ
diff --git 
a/regression-test/data/external_table_p2/hive/test_hive_translation_insert_only.out
 
b/regression-test/data/external_table_p2/hive/test_hive_translation_insert_only.out
new file mode 100644
index 00000000000..e4bdb3fe32d
Binary files /dev/null and 
b/regression-test/data/external_table_p2/hive/test_hive_translation_insert_only.out
 differ
diff --git 
a/regression-test/suites/external_table_p0/hive/test_transactional_hive.groovy 
b/regression-test/suites/external_table_p0/hive/test_transactional_hive.groovy
index dbe20395ec9..4f7008ec172 100644
--- 
a/regression-test/suites/external_table_p0/hive/test_transactional_hive.groovy
+++ 
b/regression-test/suites/external_table_p0/hive/test_transactional_hive.groovy
@@ -52,6 +52,70 @@ suite("test_transactional_hive", 
"p0,external,hive,external_docker,external_dock
         select count(*) from orc_full_acid_par_empty;
         """
     }
+
+    def test_acid = {
+        
+        sql """set enable_fallback_to_original_planner=false;"""
+        try {
+            sql """ select * from orc_to_acid_tb """ 
+        }catch( Exception e) {
+            assertTrue(e.getMessage().contains("For no acid table convert to 
acid, please COMPACT"));
+        }
+
+        qt_2 """ select * from orc_to_acid_compacted_tb order by id """
+        qt_3 """ select * from orc_to_acid_compacted_tb where part_col=101 
order by id """
+        qt_4 """ select * from orc_to_acid_compacted_tb where part_col=102 
order by id """
+        qt_5 """ select * from orc_to_acid_compacted_tb where id < 3 order by 
id """
+        qt_6 """ select * from orc_to_acid_compacted_tb where id > 3 order by 
id """
+
+
+        qt_7 """ select * from orc_acid_minor order by id """
+        qt_10 """ select * from orc_acid_minor where id < 3 order by id """
+        qt_11 """ select * from orc_acid_minor where id > 3 order by id """
+
+
+        qt_12 """ select * from orc_acid_major order by id """
+        qt_15 """ select * from orc_acid_major where id < 3 order by id """
+        qt_16 """ select * from orc_acid_major where id > 3 order by id """
+    }
+
+    def test_acid_write = {
+        sql """set enable_fallback_to_original_planner=false;"""
+        
+
+
+        try {
+            sql """ 
+                CREATE TABLE acid_tb (
+                `col1` BOOLEAN COMMENT 'col1',
+                `col2` INT COMMENT 'col2'
+                )  ENGINE=hive
+                PROPERTIES (
+                'file_format'='orc',
+                'compression'='zlib',
+                'bucketing_version'='2',
+                'transactional'='true',
+                'transactional_properties'='default'
+                );
+            """
+        }catch( Exception e) {
+            assertTrue(e.getMessage().contains("Not support create hive 
transactional table."));
+        }
+        try { 
+            sql """ insert into orc_acid_major(id,value) values(1,"a1"); """
+        }catch (Exception e) {
+            assertTrue(e.getMessage().contains("Not supported insert into hive 
transactional table."));
+        }
+        
+        try { 
+            sql """ drop table orc_acid_major; """
+        } catch (Exception e) {
+            assertTrue(e.getMessage().contains("Not support drop hive 
transactional table."));
+
+        }
+    }
+
+
     String enabled = context.config.otherConfigs.get("enableHiveTest")
     if (enabled == null || !enabled.equalsIgnoreCase("true")) {
         logger.info("diable Hive test.")
@@ -60,6 +124,7 @@ suite("test_transactional_hive", 
"p0,external,hive,external_docker,external_dock
 
     for (String hivePrefix : ["hive3"]) {
         try {
+            String hdfs_port = context.config.otherConfigs.get(hivePrefix + 
"HdfsPort")
             String hms_port = context.config.otherConfigs.get(hivePrefix + 
"HmsPort")
             String catalog_name = "test_transactional_${hivePrefix}"
             String externalEnvIp = 
context.config.otherConfigs.get("externalEnvIp")
@@ -68,6 +133,7 @@ suite("test_transactional_hive", 
"p0,external,hive,external_docker,external_dock
             sql """create catalog if not exists ${catalog_name} properties (
                 "type"="hms",
                 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}'
+                ,'fs.defaultFS' = 'hdfs://${externalEnvIp}:${hdfs_port}'
             );"""
             sql """use `${catalog_name}`.`default`"""
 
@@ -79,6 +145,9 @@ suite("test_transactional_hive", 
"p0,external,hive,external_docker,external_dock
             q01()
             q01_par()
 
+            test_acid()
+            test_acid_write()
+
             sql """drop catalog if exists ${catalog_name}"""
         } finally {
         }
diff --git 
a/regression-test/suites/external_table_p2/hive/test_hive_translation_insert_only.groovy
 
b/regression-test/suites/external_table_p2/hive/test_hive_translation_insert_only.groovy
new file mode 100644
index 00000000000..62a9727ed86
--- /dev/null
+++ 
b/regression-test/suites/external_table_p2/hive/test_hive_translation_insert_only.groovy
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_hive_translation_insert_only", 
"p2,external,hive,external_remote,external_remote_hive") {
+
+    String enabled = context.config.otherConfigs.get("enableExternalHudiTest")
+    //hudi hive use same catalog in p2.
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("disable test")
+    }
+
+    String props = context.config.otherConfigs.get("hudiEmrCatalog")    
+    String hms_catalog_name = "test_hive_translation_insert_only"
+
+    sql """drop catalog if exists ${hms_catalog_name};"""
+    sql """
+        CREATE CATALOG IF NOT EXISTS ${hms_catalog_name}
+        PROPERTIES ( 
+            ${props}
+            ,'hive.version' = '3.1.3'
+        );
+    """
+
+    logger.info("catalog " + hms_catalog_name + " created")
+    sql """switch ${hms_catalog_name};"""
+    logger.info("switched to catalog " + hms_catalog_name)
+    sql """ use regression;"""
+
+    qt_1 """ select * from text_insert_only order by id """ 
+    qt_2 """ select * from parquet_insert_only_major order by id """ 
+    qt_3 """ select * from orc_insert_only_minor order by id """ 
+
+    sql """drop catalog ${hms_catalog_name};"""
+}
+
+
+/*
+SET hive.support.concurrency=true;
+SET hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+
+
+create table text_insert_only (id INT, value STRING)
+ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
+TBLPROPERTIES ('transactional' = 'true',
+'transactional_properties'='insert_only');
+insert into text_insert_only values (1, 'A');
+insert into text_insert_only values (2, 'B');
+insert into text_insert_only values (3, 'C');
+Load data xxx    (4,D)
+
+
+create table parquet_insert_only_major (id INT, value STRING)
+CLUSTERED BY (id) INTO 3 BUCKETS
+STORED AS parquet
+TBLPROPERTIES ('transactional' = 'true',
+'transactional_properties'='insert_only');
+insert into parquet_insert_only_major values (1, 'A');
+insert into parquet_insert_only_major values (2, 'B');
+insert into parquet_insert_only_major values (3, 'C');
+ALTER TABLE parquet_insert_only_major COMPACT 'major';
+insert into parquet_insert_only_major values (4, 'D');
+insert into parquet_insert_only_major values (5, 'E');
+
+
+create table orc_insert_only_minor (id INT, value STRING)
+CLUSTERED BY (id) INTO 3 BUCKETS
+stored as orc
+TBLPROPERTIES ('transactional' = 'true',
+'transactional_properties'='insert_only');
+insert into orc_insert_only_minor values (1, 'A');
+insert into orc_insert_only_minor values (2, 'B');
+insert into orc_insert_only_minor values (3, 'C');
+ALTER TABLE orc_insert_only_minor COMPACT 'minor';
+insert into orc_insert_only_minor values (4, 'D');
+insert into orc_insert_only_minor values (5, 'E');
+
+*/


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to