This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new c1d4af2dc89 branch-3.1: [support](orc)support orc file meta cache. 
#54591 (#55584)
c1d4af2dc89 is described below

commit c1d4af2dc895c9c19c926ae07f3eebc14f262284
Author: daidai <[email protected]>
AuthorDate: Thu Sep 4 10:42:39 2025 +0800

    branch-3.1: [support](orc)support orc file meta cache. #54591 (#55584)
    
    bp #54591
---
 be/src/io/fs/file_meta_cache.cpp                   |  31 +++--
 be/src/io/fs/file_meta_cache.h                     |  20 ++-
 be/src/olap/push_handler.cpp                       |   1 +
 be/src/util/obj_lru_cache.cpp                      |   5 +-
 be/src/util/obj_lru_cache.h                        |  12 +-
 be/src/vec/exec/format/generic_reader.h            |   5 +
 be/src/vec/exec/format/orc/vorc_reader.cpp         |  89 ++++++++----
 be/src/vec/exec/format/orc/vorc_reader.h           |  11 +-
 .../vec/exec/format/parquet/parquet_thrift_util.h  |   7 +-
 be/src/vec/exec/format/parquet/schema_desc.cpp     |  66 ---------
 be/src/vec/exec/format/parquet/schema_desc.h       |   4 -
 .../exec/format/parquet/vparquet_column_reader.h   |   8 +-
 .../exec/format/parquet/vparquet_file_metadata.cpp |   2 +-
 .../exec/format/parquet/vparquet_file_metadata.h   |   5 +-
 be/src/vec/exec/format/parquet/vparquet_reader.cpp |  61 ++++----
 be/src/vec/exec/format/parquet/vparquet_reader.h   |  18 ++-
 be/src/vec/exec/format/table/hive_reader.cpp       |  26 +++-
 be/src/vec/exec/format/table/hive_reader.h         |  22 ++-
 be/src/vec/exec/format/table/hudi_reader.h         |  14 +-
 be/src/vec/exec/format/table/iceberg_reader.cpp    |  10 +-
 be/src/vec/exec/format/table/iceberg_reader.h      |  23 +--
 be/src/vec/exec/format/table/paimon_reader.cpp     |   5 +-
 be/src/vec/exec/format/table/paimon_reader.h       |  13 +-
 .../vec/exec/format/table/table_format_reader.cpp  |  14 +-
 be/src/vec/exec/format/table/table_format_reader.h |   9 +-
 .../format/table/transactional_hive_reader.cpp     |   8 +-
 .../exec/format/table/transactional_hive_reader.h  |   2 +-
 be/src/vec/exec/scan/vfile_scanner.cpp             |  52 ++++---
 be/src/vec/exec/scan/vfile_scanner.h               |   4 +-
 .../format/file_reader/file_meta_cache_test.cpp    | 154 +++++++++++++++++++++
 .../exec/format/parquet/parquet_reader_test.cpp    |   9 +-
 .../exec/format/parquet/parquet_thrift_test.cpp    |  10 +-
 be/test/vec/exec/orc/orc_convert_dict_test.cpp     |  10 +-
 be/test/vec/exec/orc/orc_reader_fill_data_test.cpp |  12 +-
 .../vec/exec/orc/orc_reader_init_column_test.cpp   |   6 +-
 be/test/vec/exec/orc_reader_test.cpp               |   7 +-
 .../create_preinstalled_scripts/iceberg/run19.sql  |  42 +++++-
 .../hive/test_external_catalog_hive.out            | Bin 9227 -> 10547 bytes
 .../hive/test_file_meta_cache.out                  | Bin 0 -> 383 bytes
 .../iceberg/test_iceberg_invaild_avro_name.out     | Bin 0 -> 517 bytes
 .../hive/test_external_catalog_hive.groovy         |  18 +++
 .../hive/test_file_meta_cache.groovy               |  81 +++++++++++
 .../iceberg/test_iceberg_invaild_avro_name.groovy  |  67 +++++++++
 43 files changed, 694 insertions(+), 269 deletions(-)

diff --git a/be/src/io/fs/file_meta_cache.cpp b/be/src/io/fs/file_meta_cache.cpp
index 226fb663f5b..f97b2f80cd6 100644
--- a/be/src/io/fs/file_meta_cache.cpp
+++ b/be/src/io/fs/file_meta_cache.cpp
@@ -17,26 +17,27 @@
 
 #include "io/fs/file_meta_cache.h"
 
-#include "vec/exec/format/parquet/parquet_thrift_util.h"
-
 namespace doris {
 
-Status FileMetaCache::get_parquet_footer(io::FileReaderSPtr file_reader, 
io::IOContext* io_ctx,
-                                         int64_t mtime, size_t* meta_size,
-                                         ObjLRUCache::CacheHandle* handle) {
-    ObjLRUCache::CacheHandle cache_handle;
-    std::string cache_key = file_reader->path().native() + 
std::to_string(mtime);
-    auto hit_cache = _cache.lookup({cache_key}, &cache_handle);
-    if (hit_cache) {
-        *handle = std::move(cache_handle);
-        *meta_size = 0;
+std::string FileMetaCache::get_key(const std::string file_name, int64_t 
modification_time,
+                                   int64_t file_size) {
+    std::string meta_cache_key;
+    meta_cache_key.resize(file_name.size() + sizeof(int64_t));
+
+    memcpy(meta_cache_key.data(), file_name.data(), file_name.size());
+    if (modification_time != 0) {
+        memcpy(meta_cache_key.data() + file_name.size(), &modification_time, 
sizeof(int64_t));
     } else {
-        vectorized::FileMetaData* meta = nullptr;
-        RETURN_IF_ERROR(vectorized::parse_thrift_footer(file_reader, &meta, 
meta_size, io_ctx));
-        _cache.insert({cache_key}, meta, handle);
+        memcpy(meta_cache_key.data() + file_name.size(), &file_size, 
sizeof(int64_t));
     }
+    return meta_cache_key;
+}
 
-    return Status::OK();
+std::string FileMetaCache::get_key(io::FileReaderSPtr file_reader,
+                                   const io::FileDescription& 
_file_description) {
+    return FileMetaCache::get_key(
+            file_reader->path().native(), _file_description.mtime,
+            _file_description.file_size == -1 ? file_reader->size() : 
_file_description.file_size);
 }
 
 } // namespace doris
diff --git a/be/src/io/fs/file_meta_cache.h b/be/src/io/fs/file_meta_cache.h
index 5d3384677ee..0c62c963ce1 100644
--- a/be/src/io/fs/file_meta_cache.h
+++ b/be/src/io/fs/file_meta_cache.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include "io/file_factory.h"
 #include "io/fs/file_reader_writer_fwd.h"
 #include "util/obj_lru_cache.h"
 
@@ -34,14 +35,23 @@ public:
 
     ObjLRUCache& cache() { return _cache; }
 
-    Status get_parquet_footer(io::FileReaderSPtr file_reader, io::IOContext* 
io_ctx, int64_t mtime,
-                              size_t* meta_size, ObjLRUCache::CacheHandle* 
handle);
+    static std::string get_key(const std::string file_name, int64_t 
modification_time,
+                               int64_t file_size);
 
-    Status get_orc_footer() {
-        // TODO: implement
-        return Status::OK();
+    static std::string get_key(io::FileReaderSPtr file_reader,
+                               const io::FileDescription& _file_description);
+
+    bool lookup(const std::string& key, ObjLRUCache::CacheHandle* handle) {
+        return _cache.lookup({key}, handle);
+    }
+
+    template <typename T>
+    void insert(const std::string& key, T* value, ObjLRUCache::CacheHandle* 
handle) {
+        _cache.insert({key}, value, handle);
     }
 
+    bool enabled() const { return _cache.enabled(); }
+
 private:
     ObjLRUCache _cache;
 };
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index 051eac147fa..e97639b7d95 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -40,6 +40,7 @@
 #include "common/config.h"
 #include "common/logging.h"
 #include "common/status.h"
+#include "io/fs/file_meta_cache.h"
 #include "io/hdfs_builder.h"
 #include "olap/cumulative_compaction_time_series_policy.h"
 #include "olap/delete_handler.h"
diff --git a/be/src/util/obj_lru_cache.cpp b/be/src/util/obj_lru_cache.cpp
index 600ffdb647c..fb03f1907d3 100644
--- a/be/src/util/obj_lru_cache.cpp
+++ b/be/src/util/obj_lru_cache.cpp
@@ -22,9 +22,8 @@ namespace doris {
 ObjLRUCache::ObjLRUCache(int64_t capacity, uint32_t num_shards)
         : LRUCachePolicy(CachePolicy::CacheType::COMMON_OBJ_LRU_CACHE, 
capacity,
                          LRUCacheType::NUMBER, 
config::common_obj_lru_cache_stale_sweep_time_sec,
-                         num_shards) {
-    _enabled = (capacity > 0);
-}
+                         num_shards),
+          _enabled(capacity > 0) {}
 
 bool ObjLRUCache::lookup(const ObjKey& key, CacheHandle* handle) {
     if (!_enabled) {
diff --git a/be/src/util/obj_lru_cache.h b/be/src/util/obj_lru_cache.h
index 680a32e79bc..a62378a14bf 100644
--- a/be/src/util/obj_lru_cache.h
+++ b/be/src/util/obj_lru_cache.h
@@ -72,9 +72,10 @@ public:
         bool valid() { return _cache != nullptr && _handle != nullptr; }
 
         LRUCachePolicy* cache() const { return _cache; }
+
         template <typename T>
-        void* data() const {
-            return (void*)((ObjValue<T>*)_cache->value(_handle))->value;
+        const T* data() const {
+            return ((ObjValue<T>*)_cache->value(_handle))->value;
         }
 
     private:
@@ -98,7 +99,8 @@ public:
                                                   CachePriority::NORMAL);
             *cache_handle = CacheHandle {this, handle};
         } else {
-            cache_handle = nullptr;
+            throw doris::Exception(ErrorCode::INTERNAL_ERROR,
+                                   "ObjLRUCache disable, can not insert.");
         }
     }
 
@@ -106,8 +108,10 @@ public:
 
     bool exceed_prune_limit() override;
 
+    bool enabled() const { return _enabled; }
+
 private:
-    bool _enabled;
+    const bool _enabled;
 };
 
 } // namespace doris
diff --git a/be/src/vec/exec/format/generic_reader.h 
b/be/src/vec/exec/format/generic_reader.h
index c3efc321e2f..d12107a4b88 100644
--- a/be/src/vec/exec/format/generic_reader.h
+++ b/be/src/vec/exec/format/generic_reader.h
@@ -20,6 +20,7 @@
 #include <gen_cpp/PlanNodes_types.h>
 
 #include "common/status.h"
+#include "io/fs/file_meta_cache.h"
 #include "runtime/descriptors.h"
 #include "runtime/types.h"
 #include "util/profile_collector.h"
@@ -85,6 +86,10 @@ protected:
     /// Whether the underlying FileReader has filled the partition&missing 
columns
     bool _fill_all_columns = false;
     TPushAggOp::type _push_down_agg_type {};
+
+    // Cache to save some common part such as file footer.
+    // Maybe null if not used
+    FileMetaCache* _meta_cache = nullptr;
 };
 
 #include "common/compile_check_end.h"
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp 
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 741399ff7dc..2e60dcfb6c5 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -165,7 +165,7 @@ void StripeStreamInputStream::read(void* buf, uint64_t 
length, uint64_t offset)
 OrcReader::OrcReader(RuntimeProfile* profile, RuntimeState* state,
                      const TFileScanRangeParams& params, const TFileRangeDesc& 
range,
                      size_t batch_size, const std::string& ctz, io::IOContext* 
io_ctx,
-                     bool enable_lazy_mat)
+                     FileMetaCache* meta_cache, bool enable_lazy_mat)
         : _profile(profile),
           _state(state),
           _scan_params(params),
@@ -183,13 +183,15 @@ OrcReader::OrcReader(RuntimeProfile* profile, 
RuntimeState* state,
     VecDateTimeValue t;
     t.from_unixtime(0, ctz);
     _offset_days = t.day() == 31 ? -1 : 0; // If 1969-12-31, then returns -1.
+    _meta_cache = meta_cache;
     _init_profile();
     _init_system_properties();
     _init_file_description();
 }
 
 OrcReader::OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& 
range,
-                     const std::string& ctz, io::IOContext* io_ctx, bool 
enable_lazy_mat)
+                     const std::string& ctz, io::IOContext* io_ctx, 
FileMetaCache* meta_cache,
+                     bool enable_lazy_mat)
         : _profile(nullptr),
           _scan_params(params),
           _scan_range(range),
@@ -199,6 +201,7 @@ OrcReader::OrcReader(const TFileScanRangeParams& params, 
const TFileRangeDesc& r
           _enable_lazy_mat(enable_lazy_mat),
           _enable_filter_by_min_max(true),
           _dict_cols_has_converted(false) {
+    _meta_cache = meta_cache;
     _init_system_properties();
     _init_file_description();
 }
@@ -221,7 +224,8 @@ void OrcReader::_collect_profile_before_close() {
         COUNTER_UPDATE(_orc_profile.predicate_filter_time, 
_statistics.predicate_filter_time);
         COUNTER_UPDATE(_orc_profile.dict_filter_rewrite_time, 
_statistics.dict_filter_rewrite_time);
         COUNTER_UPDATE(_orc_profile.lazy_read_filtered_rows, 
_statistics.lazy_read_filtered_rows);
-
+        COUNTER_UPDATE(_orc_profile.file_footer_read_calls, 
_statistics.file_footer_read_calls);
+        COUNTER_UPDATE(_orc_profile.file_footer_hit_cache, 
_statistics.file_footer_hit_cache);
         if (_file_input_stream != nullptr) {
             _file_input_stream->collect_profile_before_close();
         }
@@ -260,10 +264,15 @@ void OrcReader::_init_profile() {
                 ADD_COUNTER_WITH_LEVEL(_profile, "SelectedRowGroupCount", 
TUnit::UNIT, 1);
         _orc_profile.evaluated_row_group_count =
                 ADD_COUNTER_WITH_LEVEL(_profile, "EvaluatedRowGroupCount", 
TUnit::UNIT, 1);
+        _orc_profile.file_footer_read_calls =
+                ADD_COUNTER_WITH_LEVEL(_profile, "FileFooterReadCalls", 
TUnit::UNIT, 1);
+        _orc_profile.file_footer_hit_cache =
+                ADD_COUNTER_WITH_LEVEL(_profile, "FileFooterHitCache", 
TUnit::UNIT, 1);
     }
 }
 
 Status OrcReader::_create_file_reader() {
+    SCOPED_RAW_TIMER(&_statistics.create_reader_time);
     if (_reader != nullptr) {
         return Status::OK();
     }
@@ -283,27 +292,56 @@ Status OrcReader::_create_file_reader() {
     if (_file_input_stream->getLength() == 0) {
         return Status::EndOfFile("empty orc file: " + _scan_range.path);
     }
+
     // create orc reader
-    try {
-        orc::ReaderOptions options;
-        options.setMemoryPool(*ExecEnv::GetInstance()->orc_memory_pool());
-        options.setReaderMetrics(&_reader_metrics);
-        _reader = orc::createReader(
-                
std::unique_ptr<ORCFileInputStream>(_file_input_stream.release()), options);
-    } catch (std::exception& e) {
-        // invoker maybe just skip Status.NotFound and continue
-        // so we need distinguish between it and other kinds of errors
-        std::string _err_msg = e.what();
-        if (_io_ctx && _io_ctx->should_stop && _err_msg == "stop") {
-            return Status::EndOfFile("stop");
+    orc::ReaderOptions options;
+    options.setMemoryPool(*ExecEnv::GetInstance()->orc_memory_pool());
+    options.setReaderMetrics(&_reader_metrics);
+
+    auto create_orc_reader = [&]() {
+        try {
+            _reader = orc::createReader(
+                    
std::unique_ptr<ORCFileInputStream>(_file_input_stream.release()), options);
+        } catch (std::exception& e) {
+            // invoker maybe just skip Status.NotFound and continue
+            // so we need distinguish between it and other kinds of errors
+            std::string _err_msg = e.what();
+            if (_io_ctx && _io_ctx->should_stop && _err_msg == "stop") {
+                return Status::EndOfFile("stop");
+            }
+            // one for fs, the other is for oss.
+            if (_err_msg.find("No such file or directory") != 
std::string::npos ||
+                _err_msg.find("NoSuchKey") != std::string::npos) {
+                return Status::NotFound(_err_msg);
+            }
+            return Status::InternalError("Init OrcReader failed. reason = {}", 
_err_msg);
         }
-        // one for fs, the other is for oss.
-        if (_err_msg.find("No such file or directory") != std::string::npos ||
-            _err_msg.find("NoSuchKey") != std::string::npos) {
-            return Status::NotFound(_err_msg);
+        return Status::OK();
+    };
+
+    if (_meta_cache == nullptr) {
+        _statistics.file_footer_read_calls++;
+        RETURN_IF_ERROR(create_orc_reader());
+    } else {
+        auto inner_file_reader = _file_input_stream->get_inner_reader();
+        const auto& file_meta_cache_key =
+                FileMetaCache::get_key(inner_file_reader, _file_description);
+
+        // Local variables can be required because setSerializedFileTail is an 
assignment operation, not a reference.
+        ObjLRUCache::CacheHandle _meta_cache_handle;
+        if (_meta_cache->lookup(file_meta_cache_key, &_meta_cache_handle)) {
+            const std::string* footer_ptr = _meta_cache_handle.data<String>();
+            options.setSerializedFileTail(*footer_ptr);
+            RETURN_IF_ERROR(create_orc_reader());
+            _statistics.file_footer_hit_cache++;
+        } else {
+            _statistics.file_footer_read_calls++;
+            RETURN_IF_ERROR(create_orc_reader());
+            std::string* footer_ptr = new std::string 
{_reader->getSerializedFileTail()};
+            _meta_cache->insert(file_meta_cache_key, footer_ptr, 
&_meta_cache_handle);
         }
-        return Status::InternalError("Init OrcReader failed. reason = {}", 
_err_msg);
     }
+
     return Status::OK();
 }
 
@@ -337,14 +375,8 @@ Status OrcReader::init_reader(
         _orc_max_merge_distance_bytes = 
_state->query_options().orc_max_merge_distance_bytes;
     }
 
-    {
-        SCOPED_RAW_TIMER(&_statistics.create_reader_time);
-        RETURN_IF_ERROR(_create_file_reader());
-    }
-    {
-        SCOPED_RAW_TIMER(&_statistics.init_column_time);
-        RETURN_IF_ERROR(_init_read_columns());
-    }
+    RETURN_IF_ERROR(_create_file_reader());
+    RETURN_IF_ERROR(_init_read_columns());
     return Status::OK();
 }
 
@@ -364,6 +396,7 @@ Status 
OrcReader::get_parsed_schema(std::vector<std::string>* col_names,
 }
 
 Status OrcReader::_init_read_columns() {
+    SCOPED_RAW_TIMER(&_statistics.init_column_time);
     const auto& root_type = _reader->getType();
     if (_is_acid) {
         for (uint64_t i = 0; i < root_type.getSubtypeCount(); ++i) {
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h 
b/be/src/vec/exec/format/orc/vorc_reader.h
index febb7202857..82f8693cc31 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -36,6 +36,7 @@
 #include "exec/olap_common.h"
 #include "io/file_factory.h"
 #include "io/fs/buffered_reader.h"
+#include "io/fs/file_meta_cache.h"
 #include "io/fs/file_reader.h"
 #include "io/fs/file_reader_writer_fwd.h"
 #include "io/fs/tracing_file_reader.h"
@@ -132,14 +133,18 @@ public:
         int64_t predicate_filter_time = 0;
         int64_t dict_filter_rewrite_time = 0;
         int64_t lazy_read_filtered_rows = 0;
+        int64_t file_footer_read_calls = 0;
+        int64_t file_footer_hit_cache = 0;
     };
 
     OrcReader(RuntimeProfile* profile, RuntimeState* state, const 
TFileScanRangeParams& params,
               const TFileRangeDesc& range, size_t batch_size, const 
std::string& ctz,
-              io::IOContext* io_ctx, bool enable_lazy_mat = true);
+              io::IOContext* io_ctx, FileMetaCache* meta_cache = nullptr,
+              bool enable_lazy_mat = true);
 
     OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& range,
-              const std::string& ctz, io::IOContext* io_ctx, bool 
enable_lazy_mat = true);
+              const std::string& ctz, io::IOContext* io_ctx, FileMetaCache* 
meta_cache = nullptr,
+              bool enable_lazy_mat = true);
 
     ~OrcReader() override;
     //If you want to read the file by index instead of column name, set 
hive_use_column_names to false.
@@ -240,6 +245,8 @@ private:
         RuntimeProfile::Counter* lazy_read_filtered_rows = nullptr;
         RuntimeProfile::Counter* selected_row_group_count = nullptr;
         RuntimeProfile::Counter* evaluated_row_group_count = nullptr;
+        RuntimeProfile::Counter* file_footer_read_calls = nullptr;
+        RuntimeProfile::Counter* file_footer_hit_cache = nullptr;
     };
 
     class ORCFilterImpl : public orc::ORCFilter {
diff --git a/be/src/vec/exec/format/parquet/parquet_thrift_util.h 
b/be/src/vec/exec/format/parquet/parquet_thrift_util.h
index b8475ffa989..d27b9952f77 100644
--- a/be/src/vec/exec/format/parquet/parquet_thrift_util.h
+++ b/be/src/vec/exec/format/parquet/parquet_thrift_util.h
@@ -36,8 +36,9 @@ constexpr uint8_t PARQUET_VERSION_NUMBER[4] = {'P', 'A', 'R', 
'1'};
 constexpr uint32_t PARQUET_FOOTER_SIZE = 8;
 constexpr size_t INIT_META_SIZE = 48 * 1024; // 48k
 
-static Status parse_thrift_footer(io::FileReaderSPtr file, FileMetaData** 
file_metadata,
-                                  size_t* meta_size, io::IOContext* io_ctx) {
+static Status parse_thrift_footer(io::FileReaderSPtr file,
+                                  std::unique_ptr<FileMetaData>* 
file_metadata, size_t* meta_size,
+                                  io::IOContext* io_ctx) {
     size_t file_size = file->size();
     size_t bytes_read = std::min(file_size, INIT_META_SIZE);
     std::vector<uint8_t> footer(bytes_read);
@@ -78,7 +79,7 @@ static Status parse_thrift_footer(io::FileReaderSPtr file, 
FileMetaData** file_m
     tparquet::FileMetaData t_metadata;
     // deserialize footer
     RETURN_IF_ERROR(deserialize_thrift_msg(meta_ptr, &metadata_size, true, 
&t_metadata));
-    *file_metadata = new FileMetaData(t_metadata, metadata_size);
+    *file_metadata = std::make_unique<FileMetaData>(t_metadata, metadata_size);
     RETURN_IF_ERROR((*file_metadata)->init_schema());
     *meta_size = PARQUET_FOOTER_SIZE + metadata_size;
     return Status::OK();
diff --git a/be/src/vec/exec/format/parquet/schema_desc.cpp 
b/be/src/vec/exec/format/parquet/schema_desc.cpp
index 1f72a1fda5b..706dfee4e7c 100644
--- a/be/src/vec/exec/format/parquet/schema_desc.cpp
+++ b/be/src/vec/exec/format/parquet/schema_desc.cpp
@@ -247,72 +247,6 @@ std::pair<TypeDescriptor, bool> 
FieldDescriptor::get_doris_type(
     return ans;
 }
 
-// Copy from org.apache.iceberg.avro.AvroSchemaUtil#validAvroName
-static bool is_valid_avro_name(const std::string& name) {
-    int length = name.length();
-    char first = name[0];
-    if (!isalpha(first) && first != '_') {
-        return false;
-    }
-
-    for (int i = 1; i < length; i++) {
-        char character = name[i];
-        if (!isalpha(character) && !isdigit(character) && character != '_') {
-            return false;
-        }
-    }
-    return true;
-}
-
-// Copy from org.apache.iceberg.avro.AvroSchemaUtil#sanitize
-static void sanitize_avro_name(std::ostringstream& buf, char character) {
-    if (isdigit(character)) {
-        buf << '_' << character;
-    } else {
-        std::stringstream ss;
-        ss << std::hex << (int)character;
-        std::string hex_str = ss.str();
-        buf << "_x" << doris::to_lower(hex_str);
-    }
-}
-
-// Copy from org.apache.iceberg.avro.AvroSchemaUtil#sanitize
-static std::string sanitize_avro_name(const std::string& name) {
-    std::ostringstream buf;
-    int length = name.length();
-    char first = name[0];
-    if (!isalpha(first) && first != '_') {
-        sanitize_avro_name(buf, first);
-    } else {
-        buf << first;
-    }
-
-    for (int i = 1; i < length; i++) {
-        char character = name[i];
-        if (!isalpha(character) && !isdigit(character) && character != '_') {
-            sanitize_avro_name(buf, character);
-        } else {
-            buf << character;
-        }
-    }
-    return buf.str();
-}
-
-void FieldDescriptor::iceberg_sanitize(const std::vector<std::string>& 
read_columns) {
-    for (const std::string& col : read_columns) {
-        if (!is_valid_avro_name(col)) {
-            std::string sanitize_name = sanitize_avro_name(col);
-            auto it = _name_to_field.find(sanitize_name);
-            if (it != _name_to_field.end()) {
-                FieldSchema* schema = const_cast<FieldSchema*>(it->second);
-                schema->name = col;
-                _name_to_field.emplace(col, schema);
-                _name_to_field.erase(sanitize_name);
-            }
-        }
-    }
-}
-
 std::pair<TypeDescriptor, bool> FieldDescriptor::convert_to_doris_type(
         tparquet::LogicalType logicalType) {
     std::pair<TypeDescriptor, bool> ans = {INVALID_TYPE, false};
diff --git a/be/src/vec/exec/format/parquet/schema_desc.h 
b/be/src/vec/exec/format/parquet/schema_desc.h
index 408a45eae4b..16d3c1cc16c 100644
--- a/be/src/vec/exec/format/parquet/schema_desc.h
+++ b/be/src/vec/exec/format/parquet/schema_desc.h
@@ -100,10 +100,6 @@ private:
 public:
     std::pair<TypeDescriptor, bool> get_doris_type(const 
tparquet::SchemaElement& physical_schema);
 
-    // org.apache.iceberg.avro.AvroSchemaUtil#sanitize will encode special 
characters,
-    // we have to decode these characters
-    void iceberg_sanitize(const std::vector<std::string>& read_columns);
-
     FieldDescriptor() = default;
     ~FieldDescriptor() = default;
 
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
index 1d2c2afe563..619f87cad97 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
@@ -58,7 +58,7 @@ public:
         Statistics()
                 : read_time(0),
                   read_calls(0),
-                  meta_read_calls(0),
+                  page_index_read_calls(0),
                   read_bytes(0),
                   decompress_time(0),
                   decompress_cnt(0),
@@ -74,7 +74,7 @@ public:
                    int64_t null_map_time)
                 : read_time(fs.read_time),
                   read_calls(fs.read_calls),
-                  meta_read_calls(0),
+                  page_index_read_calls(0),
                   read_bytes(fs.read_bytes),
                   decompress_time(cs.decompress_time),
                   decompress_cnt(cs.decompress_cnt),
@@ -88,7 +88,7 @@ public:
 
         int64_t read_time;
         int64_t read_calls;
-        int64_t meta_read_calls;
+        int64_t page_index_read_calls;
         int64_t read_bytes;
         int64_t decompress_time;
         int64_t decompress_cnt;
@@ -104,7 +104,7 @@ public:
             read_time += statistics.read_time;
             read_calls += statistics.read_calls;
             read_bytes += statistics.read_bytes;
-            meta_read_calls += statistics.meta_read_calls;
+            page_index_read_calls += statistics.page_index_read_calls;
             decompress_time += statistics.decompress_time;
             decompress_cnt += statistics.decompress_cnt;
             decode_header_time += statistics.decode_header_time;
diff --git a/be/src/vec/exec/format/parquet/vparquet_file_metadata.cpp 
b/be/src/vec/exec/format/parquet/vparquet_file_metadata.cpp
index 98de497e320..fea7d9d545b 100644
--- a/be/src/vec/exec/format/parquet/vparquet_file_metadata.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_file_metadata.cpp
@@ -44,7 +44,7 @@ Status FileMetaData::init_schema() {
     return _schema.parse_from_thrift(_metadata.schema);
 }
 
-const tparquet::FileMetaData& FileMetaData::to_thrift() {
+const tparquet::FileMetaData& FileMetaData::to_thrift() const {
     return _metadata;
 }
 
diff --git a/be/src/vec/exec/format/parquet/vparquet_file_metadata.h 
b/be/src/vec/exec/format/parquet/vparquet_file_metadata.h
index d1ebb06957d..9dfdaf97beb 100644
--- a/be/src/vec/exec/format/parquet/vparquet_file_metadata.h
+++ b/be/src/vec/exec/format/parquet/vparquet_file_metadata.h
@@ -31,10 +31,7 @@ public:
     ~FileMetaData();
     Status init_schema();
     const FieldDescriptor& schema() const { return _schema; }
-    const tparquet::FileMetaData& to_thrift();
-    void iceberg_sanitize(const std::vector<std::string>& read_columns) {
-        _schema.iceberg_sanitize(read_columns);
-    }
+    const tparquet::FileMetaData& to_thrift() const;
     std::string debug_string() const;
     size_t get_mem_size() const { return _mem_size; }
 
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 331e9696250..56d88430e16 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -90,18 +90,19 @@ ParquetReader::ParquetReader(RuntimeProfile* profile, const 
TFileScanRangeParams
           _ctz(ctz),
           _io_ctx(io_ctx),
           _state(state),
-          _meta_cache(meta_cache),
           _enable_lazy_mat(enable_lazy_mat),
           _enable_filter_by_min_max(
                   state == nullptr ? true
                                    : 
state->query_options().enable_parquet_filter_by_min_max) {
+    _meta_cache = meta_cache;
     _init_profile();
     _init_system_properties();
     _init_file_description();
 }
 
 ParquetReader::ParquetReader(const TFileScanRangeParams& params, const 
TFileRangeDesc& range,
-                             io::IOContext* io_ctx, RuntimeState* state, bool 
enable_lazy_mat)
+                             io::IOContext* io_ctx, RuntimeState* state, 
FileMetaCache* meta_cache,
+                             bool enable_lazy_mat)
         : _profile(nullptr),
           _scan_params(params),
           _scan_range(range),
@@ -111,6 +112,7 @@ ParquetReader::ParquetReader(const TFileScanRangeParams& 
params, const TFileRang
           _enable_filter_by_min_max(
                   state == nullptr ? true
                                    : 
state->query_options().enable_parquet_filter_by_min_max) {
+    _meta_cache = meta_cache;
     _init_system_properties();
     _init_file_description();
 }
@@ -156,6 +158,8 @@ void ParquetReader::_init_profile() {
                 ADD_CHILD_TIMER_WITH_LEVEL(_profile, "FileOpenTime", 
parquet_profile, 1);
         _parquet_profile.open_file_num =
                 ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "FileNum", TUnit::UNIT, 
parquet_profile, 1);
+        _parquet_profile.page_index_read_calls =
+                ADD_COUNTER_WITH_LEVEL(_profile, "PageIndexReadCalls", 
TUnit::UNIT, 1);
         _parquet_profile.page_index_filter_time =
                 ADD_CHILD_TIMER_WITH_LEVEL(_profile, "PageIndexFilterTime", 
parquet_profile, 1);
         _parquet_profile.read_page_index_time =
@@ -164,8 +168,10 @@ void ParquetReader::_init_profile() {
                 ADD_CHILD_TIMER_WITH_LEVEL(_profile, "PageIndexParseTime", 
parquet_profile, 1);
         _parquet_profile.row_group_filter_time =
                 ADD_CHILD_TIMER_WITH_LEVEL(_profile, "RowGroupFilterTime", 
parquet_profile, 1);
-        _parquet_profile.file_meta_read_calls =
-                ADD_COUNTER_WITH_LEVEL(_profile, "FileMetaReadCalls", 
TUnit::UNIT, 1);
+        _parquet_profile.file_footer_read_calls =
+                ADD_COUNTER_WITH_LEVEL(_profile, "FileFooterReadCalls", 
TUnit::UNIT, 1);
+        _parquet_profile.file_footer_hit_cache =
+                ADD_COUNTER_WITH_LEVEL(_profile, "FileFooterHitCache", 
TUnit::UNIT, 1);
         _parquet_profile.decompress_time =
                 ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DecompressTime", 
parquet_profile, 1);
         _parquet_profile.decompress_cnt = ADD_CHILD_COUNTER_WITH_LEVEL(
@@ -220,6 +226,7 @@ Status ParquetReader::_open_file() {
                                                  _file_reader, 
_io_ctx->file_reader_stats)
                                        : _file_reader;
     }
+
     if (_file_metadata == nullptr) {
         SCOPED_RAW_TIMER(&_statistics.parse_footer_time);
         if (_tracing_file_reader->size() <= sizeof(PARQUET_VERSION_NUMBER)) {
@@ -230,25 +237,30 @@ Status ParquetReader::_open_file() {
         }
         size_t meta_size = 0;
         if (_meta_cache == nullptr) {
-            auto st =
-                    parse_thrift_footer(_tracing_file_reader, &_file_metadata, 
&meta_size, _io_ctx);
-            // wrap it with unique ptr, so that it can be released finally.
-            _file_metadata_ptr.reset(_file_metadata);
-            RETURN_IF_ERROR(st);
+            // wrap _file_metadata with unique ptr, so that it can be released 
finally.
+            RETURN_IF_ERROR(parse_thrift_footer(_tracing_file_reader, 
&_file_metadata_ptr,
+                                                &meta_size, _io_ctx));
+            _file_metadata = _file_metadata_ptr.get();
 
             _column_statistics.read_bytes += meta_size;
             // parse magic number & parse meta data
-            _column_statistics.meta_read_calls += 1;
+            _statistics.file_footer_read_calls += 1;
         } else {
-            
RETURN_IF_ERROR(_meta_cache->get_parquet_footer(_tracing_file_reader, _io_ctx,
-                                                            
_file_description.mtime, &meta_size,
-                                                            
&_meta_cache_handle));
-            _column_statistics.read_bytes += meta_size;
-            if (meta_size > 0) {
-                _column_statistics.meta_read_calls += 1;
+            const auto& file_meta_cache_key =
+                    FileMetaCache::get_key(_tracing_file_reader, 
_file_description);
+            if (!_meta_cache->lookup(file_meta_cache_key, 
&_meta_cache_handle)) {
+                RETURN_IF_ERROR(parse_thrift_footer(_file_reader, 
&_file_metadata_ptr, &meta_size,
+                                                    _io_ctx));
+                // _file_metadata_ptr.release() : move control of 
_file_metadata to _meta_cache_handle
+                _meta_cache->insert(file_meta_cache_key, 
_file_metadata_ptr.release(),
+                                    &_meta_cache_handle);
+                _file_metadata = _meta_cache_handle.data<FileMetaData>();
+                _column_statistics.read_bytes += meta_size;
+                _statistics.file_footer_read_calls += 1;
+            } else {
+                _statistics.file_footer_hit_cache++;
             }
-
-            _file_metadata = 
(FileMetaData*)_meta_cache_handle.data<FileMetaData>();
+            _file_metadata = _meta_cache_handle.data<FileMetaData>();
         }
 
         if (_file_metadata == nullptr) {
@@ -292,12 +304,6 @@ void ParquetReader::_init_file_description() {
     }
 }
 
-void ParquetReader::iceberg_sanitize(const std::vector<std::string>& 
read_columns) {
-    if (_file_metadata != nullptr) {
-        _file_metadata->iceberg_sanitize(read_columns);
-    }
-}
-
 Status ParquetReader::init_reader(
         const std::vector<std::string>& all_column_names,
         const std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range,
@@ -792,7 +798,7 @@ Status ParquetReader::_process_page_index(const 
tparquet::RowGroup& row_group,
     }
     _column_statistics.read_bytes += bytes_read;
     // read twice: parse column index & parse offset index
-    _column_statistics.meta_read_calls += 2;
+    _column_statistics.page_index_read_calls += 2;
     SCOPED_RAW_TIMER(&_statistics.parse_page_index_time);
 
     for (size_t idx = 0; idx < _read_table_columns.size(); idx++) {
@@ -1014,13 +1020,16 @@ void ParquetReader::_collect_profile() {
     COUNTER_UPDATE(_parquet_profile.read_page_index_time, 
_statistics.read_page_index_time);
     COUNTER_UPDATE(_parquet_profile.parse_page_index_time, 
_statistics.parse_page_index_time);
     COUNTER_UPDATE(_parquet_profile.row_group_filter_time, 
_statistics.row_group_filter_time);
+    COUNTER_UPDATE(_parquet_profile.file_footer_read_calls, 
_statistics.file_footer_read_calls);
+    COUNTER_UPDATE(_parquet_profile.file_footer_hit_cache, 
_statistics.file_footer_hit_cache);
 
     COUNTER_UPDATE(_parquet_profile.skip_page_header_num, 
_column_statistics.skip_page_header_num);
     COUNTER_UPDATE(_parquet_profile.parse_page_header_num,
                    _column_statistics.parse_page_header_num);
     COUNTER_UPDATE(_parquet_profile.predicate_filter_time, 
_statistics.predicate_filter_time);
     COUNTER_UPDATE(_parquet_profile.dict_filter_rewrite_time, 
_statistics.dict_filter_rewrite_time);
-    COUNTER_UPDATE(_parquet_profile.file_meta_read_calls, 
_column_statistics.meta_read_calls);
+    COUNTER_UPDATE(_parquet_profile.page_index_read_calls,
+                   _column_statistics.page_index_read_calls);
     COUNTER_UPDATE(_parquet_profile.decompress_time, 
_column_statistics.decompress_time);
     COUNTER_UPDATE(_parquet_profile.decompress_cnt, 
_column_statistics.decompress_cnt);
     COUNTER_UPDATE(_parquet_profile.decode_header_time, 
_column_statistics.decode_header_time);
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index c560b1c3800..b6329397be6 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -86,6 +86,8 @@ public:
         int64_t column_read_time = 0;
         int64_t parse_meta_time = 0;
         int64_t parse_footer_time = 0;
+        int64_t file_footer_read_calls = 0;
+        int64_t file_footer_hit_cache = 0;
         int64_t open_file_time = 0;
         int64_t open_file_num = 0;
         int64_t row_group_filter_time = 0;
@@ -102,7 +104,8 @@ public:
                   bool enable_lazy_mat = true);
 
     ParquetReader(const TFileScanRangeParams& params, const TFileRangeDesc& 
range,
-                  io::IOContext* io_ctx, RuntimeState* state, bool 
enable_lazy_mat = true);
+                  io::IOContext* io_ctx, RuntimeState* state, FileMetaCache* 
meta_cache = nullptr,
+                  bool enable_lazy_mat = true);
 
     ~ParquetReader() override;
     // for unit test
@@ -143,9 +146,6 @@ public:
 
     const tparquet::FileMetaData* get_meta_data() const { return _t_metadata; }
 
-    // Only for iceberg reader to sanitize invalid column names
-    void iceberg_sanitize(const std::vector<std::string>& read_columns);
-
     Status set_fill_columns(
             const std::unordered_map<std::string, std::tuple<std::string, 
const SlotDescriptor*>>&
                     partition_columns,
@@ -174,11 +174,12 @@ private:
         RuntimeProfile::Counter* open_file_time = nullptr;
         RuntimeProfile::Counter* open_file_num = nullptr;
         RuntimeProfile::Counter* row_group_filter_time = nullptr;
+        RuntimeProfile::Counter* page_index_read_calls = nullptr;
         RuntimeProfile::Counter* page_index_filter_time = nullptr;
         RuntimeProfile::Counter* read_page_index_time = nullptr;
         RuntimeProfile::Counter* parse_page_index_time = nullptr;
-
-        RuntimeProfile::Counter* file_meta_read_calls = nullptr;
+        RuntimeProfile::Counter* file_footer_read_calls = nullptr;
+        RuntimeProfile::Counter* file_footer_hit_cache = nullptr;
         RuntimeProfile::Counter* decompress_time = nullptr;
         RuntimeProfile::Counter* decompress_cnt = nullptr;
         RuntimeProfile::Counter* decode_header_time = nullptr;
@@ -240,7 +241,7 @@ private:
     // after _file_reader. Otherwise, there may be heap-use-after-free bug.
     ObjLRUCache::CacheHandle _meta_cache_handle;
     std::unique_ptr<FileMetaData> _file_metadata_ptr;
-    FileMetaData* _file_metadata = nullptr;
+    const FileMetaData* _file_metadata = nullptr;
     const tparquet::FileMetaData* _t_metadata = nullptr;
 
     // _tracing_file_reader wraps _file_reader.
@@ -290,9 +291,6 @@ private:
     bool _closed = false;
     io::IOContext* _io_ctx = nullptr;
     RuntimeState* _state = nullptr;
-    // Cache to save some common part such as file footer.
-    // Maybe null if not used
-    FileMetaCache* _meta_cache = nullptr;
     bool _enable_lazy_mat = true;
     bool _enable_filter_by_min_max = true;
     const TupleDescriptor* _tuple_descriptor = nullptr;
diff --git a/be/src/vec/exec/format/table/hive_reader.cpp 
b/be/src/vec/exec/format/table/hive_reader.cpp
index ee7f805b076..9fb084f1f91 100644
--- a/be/src/vec/exec/format/table/hive_reader.cpp
+++ b/be/src/vec/exec/format/table/hive_reader.cpp
@@ -46,7 +46,7 @@ Status HiveOrcReader::init_reader(
     if (_state->query_options().hive_orc_use_column_names && 
!is_hive_col_name) {
         // Directly use the table column name to match the file column name, 
but pay attention to the case issue.
         RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_name(tuple_descriptor, 
orc_type_ptr,
-                                                        table_info_node_ptr));
+                                                        table_info_node_ptr, 
_is_file_slot));
     } else {
         // hive1 / use index
         std::map<std::string, const SlotDescriptor*> slot_map; // table_name 
to slot
@@ -70,6 +70,10 @@ Status HiveOrcReader::init_reader(
                 table_info_node_ptr->add_children(
                         table_column_name, 
orc_type_ptr->getFieldName(file_index), field_node);
             }
+            slot_map.erase(table_column_name);
+        }
+        for (const auto& [partition_col_name, _] : slot_map) {
+            table_info_node_ptr->add_not_exist_children(partition_col_name);
         }
     }
 
@@ -95,7 +99,7 @@ Status HiveParquetReader::init_reader(
     if (_state->query_options().hive_parquet_use_column_names) {
         // Directly use the table column name to match the file column name, 
but pay attention to the case issue.
         RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_name(tuple_descriptor, 
*field_desc,
-                                                            
table_info_node_ptr));
+                                                            
table_info_node_ptr, _is_file_slot));
     } else {                                                   // use idx
         std::map<std::string, const SlotDescriptor*> slot_map; //table_name to 
slot
         for (const auto& slot : tuple_descriptor->slots()) {
@@ -109,8 +113,10 @@ Status HiveParquetReader::init_reader(
             auto file_index = _params.column_idxs[idx];
 
             if (file_index >= parquet_fields_schema.size()) {
+                // Non-partitioning columns, which may be columns added later.
                 table_info_node_ptr->add_not_exist_children(table_column_name);
             } else {
+                // Non-partitioning columns, columns that exist in both the 
table and the file.
                 auto field_node = std::make_shared<Node>();
                 // for sub-columns, still use name to match columns.
                 RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_name(
@@ -119,6 +125,22 @@ Status HiveParquetReader::init_reader(
                 table_info_node_ptr->add_children(
                         table_column_name, 
parquet_fields_schema[file_index].name, field_node);
             }
+            slot_map.erase(table_column_name);
+        }
+        /*
+         * `_params.column_idxs` only have `isIsFileSlot()`, so we need add 
`partition slot`.
+         * eg:
+         * Table : A, B, C, D     (D: partition column)
+         * Parquet file : A, B
+         * Column C is obtained by add column.
+         *
+         * sql : select * from table;
+         * slot : A, B, C, D
+         * _params.column_idxs: 0, 1, 2 (There is no 3, because column D is 
the partition column)
+         *
+         */
+        for (const auto& [partition_col_name, _] : slot_map) {
+            table_info_node_ptr->add_not_exist_children(partition_col_name);
         }
     }
 
diff --git a/be/src/vec/exec/format/table/hive_reader.h 
b/be/src/vec/exec/format/table/hive_reader.h
index 2f2c1151799..6acd344e417 100644
--- a/be/src/vec/exec/format/table/hive_reader.h
+++ b/be/src/vec/exec/format/table/hive_reader.h
@@ -30,15 +30,21 @@ class HiveReader : public TableFormatReader, public 
TableSchemaChangeHelper {
 public:
     HiveReader(std::unique_ptr<GenericReader> file_format_reader, 
RuntimeProfile* profile,
                RuntimeState* state, const TFileScanRangeParams& params, const 
TFileRangeDesc& range,
-               io::IOContext* io_ctx)
+               io::IOContext* io_ctx, const std::set<TSlotId>* is_file_slot,
+               FileMetaCache* meta_cache)
             : TableFormatReader(std::move(file_format_reader), state, profile, 
params, range,
-                                io_ctx) {};
+                                io_ctx, meta_cache),
+              _is_file_slot(is_file_slot) {};
 
     ~HiveReader() override = default;
 
     Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof) 
final;
 
     Status init_row_filters() final { return Status::OK(); };
+
+protected:
+    // https://github.com/apache/doris/pull/23369
+    const std::set<TSlotId>* _is_file_slot = nullptr;
 };
 
 class HiveOrcReader final : public HiveReader {
@@ -46,8 +52,10 @@ public:
     ENABLE_FACTORY_CREATOR(HiveOrcReader);
     HiveOrcReader(std::unique_ptr<GenericReader> file_format_reader, 
RuntimeProfile* profile,
                   RuntimeState* state, const TFileScanRangeParams& params,
-                  const TFileRangeDesc& range, io::IOContext* io_ctx)
-            : HiveReader(std::move(file_format_reader), profile, state, 
params, range, io_ctx) {};
+                  const TFileRangeDesc& range, io::IOContext* io_ctx,
+                  const std::set<TSlotId>* is_file_slot, FileMetaCache* 
meta_cache)
+            : HiveReader(std::move(file_format_reader), profile, state, 
params, range, io_ctx,
+                         is_file_slot, meta_cache) {};
     ~HiveOrcReader() final = default;
 
     Status init_reader(
@@ -65,8 +73,10 @@ public:
     ENABLE_FACTORY_CREATOR(HiveParquetReader);
     HiveParquetReader(std::unique_ptr<GenericReader> file_format_reader, 
RuntimeProfile* profile,
                       RuntimeState* state, const TFileScanRangeParams& params,
-                      const TFileRangeDesc& range, io::IOContext* io_ctx)
-            : HiveReader(std::move(file_format_reader), profile, state, 
params, range, io_ctx) {};
+                      const TFileRangeDesc& range, io::IOContext* io_ctx,
+                      const std::set<TSlotId>* is_file_slot, FileMetaCache* 
meta_cache)
+            : HiveReader(std::move(file_format_reader), profile, state, 
params, range, io_ctx,
+                         is_file_slot, meta_cache) {};
     ~HiveParquetReader() final = default;
 
     Status init_reader(
diff --git a/be/src/vec/exec/format/table/hudi_reader.h 
b/be/src/vec/exec/format/table/hudi_reader.h
index 751094018c9..50fc0e1b495 100644
--- a/be/src/vec/exec/format/table/hudi_reader.h
+++ b/be/src/vec/exec/format/table/hudi_reader.h
@@ -27,9 +27,9 @@ class HudiReader : public TableFormatReader, public 
TableSchemaChangeHelper {
 public:
     HudiReader(std::unique_ptr<GenericReader> file_format_reader, 
RuntimeProfile* profile,
                RuntimeState* state, const TFileScanRangeParams& params, const 
TFileRangeDesc& range,
-               io::IOContext* io_ctx)
+               io::IOContext* io_ctx, FileMetaCache* meta_cache)
             : TableFormatReader(std::move(file_format_reader), state, profile, 
params, range,
-                                io_ctx) {};
+                                io_ctx, meta_cache) {};
 
     ~HudiReader() override = default;
 
@@ -43,8 +43,9 @@ public:
     ENABLE_FACTORY_CREATOR(HudiParquetReader);
     HudiParquetReader(std::unique_ptr<GenericReader> file_format_reader, 
RuntimeProfile* profile,
                       RuntimeState* state, const TFileScanRangeParams& params,
-                      const TFileRangeDesc& range, io::IOContext* io_ctx)
-            : HudiReader(std::move(file_format_reader), profile, state, 
params, range, io_ctx) {};
+                      const TFileRangeDesc& range, io::IOContext* io_ctx, 
FileMetaCache* meta_cache)
+            : HudiReader(std::move(file_format_reader), profile, state, 
params, range, io_ctx,
+                         meta_cache) {};
     ~HudiParquetReader() final = default;
 
     Status init_reader(
@@ -63,8 +64,9 @@ public:
     ENABLE_FACTORY_CREATOR(HudiOrcReader);
     HudiOrcReader(std::unique_ptr<GenericReader> file_format_reader, 
RuntimeProfile* profile,
                   RuntimeState* state, const TFileScanRangeParams& params,
-                  const TFileRangeDesc& range, io::IOContext* io_ctx)
-            : HudiReader(std::move(file_format_reader), profile, state, 
params, range, io_ctx) {};
+                  const TFileRangeDesc& range, io::IOContext* io_ctx, 
FileMetaCache* meta_cache)
+            : HudiReader(std::move(file_format_reader), profile, state, 
params, range, io_ctx,
+                         meta_cache) {};
     ~HudiOrcReader() final = default;
 
     Status init_reader(
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp 
b/be/src/vec/exec/format/table/iceberg_reader.cpp
index d6471a49efb..d777bf2dca0 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -78,8 +78,9 @@ 
IcebergTableReader::IcebergTableReader(std::unique_ptr<GenericReader> file_forma
                                        RuntimeProfile* profile, RuntimeState* 
state,
                                        const TFileScanRangeParams& params,
                                        const TFileRangeDesc& range, 
ShardedKVCache* kv_cache,
-                                       io::IOContext* io_ctx)
-        : TableFormatReader(std::move(file_format_reader), state, profile, 
params, range, io_ctx),
+                                       io::IOContext* io_ctx, FileMetaCache* 
meta_cache)
+        : TableFormatReader(std::move(file_format_reader), state, profile, 
params, range, io_ctx,
+                            meta_cache),
           _kv_cache(kv_cache) {
     static const char* iceberg_profile = "IcebergProfile";
     ADD_TIMER(_profile, iceberg_profile);
@@ -457,7 +458,7 @@ Status IcebergParquetReader 
::_read_position_delete_file(const TFileRangeDesc* d
                                                          DeleteFile* 
position_delete) {
     ParquetReader parquet_delete_reader(
             _profile, _params, *delete_range, READ_DELETE_FILE_BATCH_SIZE,
-            const_cast<cctz::time_zone*>(&_state->timezone_obj()), _io_ctx, 
_state);
+            const_cast<cctz::time_zone*>(&_state->timezone_obj()), _io_ctx, 
_state, _meta_cache);
     RETURN_IF_ERROR(parquet_delete_reader.init_reader(
             delete_file_col_names, nullptr, {}, nullptr, nullptr, nullptr, 
nullptr, nullptr,
             TableSchemaChangeHelper::ConstNode::get_instance(), false));
@@ -537,7 +538,8 @@ Status IcebergOrcReader::init_reader(
 Status IcebergOrcReader::_read_position_delete_file(const TFileRangeDesc* 
delete_range,
                                                     DeleteFile* 
position_delete) {
     OrcReader orc_delete_reader(_profile, _state, _params, *delete_range,
-                                READ_DELETE_FILE_BATCH_SIZE, 
_state->timezone(), _io_ctx);
+                                READ_DELETE_FILE_BATCH_SIZE, 
_state->timezone(), _io_ctx,
+                                _meta_cache);
     std::unordered_map<std::string, ColumnValueRangeType> 
colname_to_value_range;
     RETURN_IF_ERROR(orc_delete_reader.init_reader(&delete_file_col_names, 
&colname_to_value_range,
                                                   {}, false, {}, {}, nullptr, 
nullptr));
diff --git a/be/src/vec/exec/format/table/iceberg_reader.h 
b/be/src/vec/exec/format/table/iceberg_reader.h
index 67a1c1cc66b..cc88325acf9 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_reader.h
@@ -74,8 +74,8 @@ public:
 
     IcebergTableReader(std::unique_ptr<GenericReader> file_format_reader, 
RuntimeProfile* profile,
                        RuntimeState* state, const TFileScanRangeParams& params,
-                       const TFileRangeDesc& range, ShardedKVCache* kv_cache,
-                       io::IOContext* io_ctx);
+                       const TFileRangeDesc& range, ShardedKVCache* kv_cache, 
io::IOContext* io_ctx,
+                       FileMetaCache* meta_cache);
     ~IcebergTableReader() override = default;
 
     Status init_row_filters() final;
@@ -164,9 +164,9 @@ public:
     IcebergParquetReader(std::unique_ptr<GenericReader> file_format_reader, 
RuntimeProfile* profile,
                          RuntimeState* state, const TFileScanRangeParams& 
params,
                          const TFileRangeDesc& range, ShardedKVCache* kv_cache,
-                         io::IOContext* io_ctx)
+                         io::IOContext* io_ctx, FileMetaCache* meta_cache)
             : IcebergTableReader(std::move(file_format_reader), profile, 
state, params, range,
-                                 kv_cache, io_ctx) {}
+                                 kv_cache, io_ctx, meta_cache) {}
     Status init_reader(
             const std::vector<std::string>& file_col_names,
             const std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range,
@@ -187,9 +187,10 @@ public:
 protected:
     std::unique_ptr<GenericReader> _create_equality_reader(
             const TFileRangeDesc& delete_desc) final {
-        return ParquetReader::create_unique(
-                _profile, _params, delete_desc, READ_DELETE_FILE_BATCH_SIZE,
-                const_cast<cctz::time_zone*>(&_state->timezone_obj()), 
_io_ctx, _state);
+        return ParquetReader::create_unique(_profile, _params, delete_desc,
+                                            READ_DELETE_FILE_BATCH_SIZE,
+                                            
const_cast<cctz::time_zone*>(&_state->timezone_obj()),
+                                            _io_ctx, _state, _meta_cache);
     }
 };
 class IcebergOrcReader final : public IcebergTableReader {
@@ -201,9 +202,10 @@ public:
 
     IcebergOrcReader(std::unique_ptr<GenericReader> file_format_reader, 
RuntimeProfile* profile,
                      RuntimeState* state, const TFileScanRangeParams& params,
-                     const TFileRangeDesc& range, ShardedKVCache* kv_cache, 
io::IOContext* io_ctx)
+                     const TFileRangeDesc& range, ShardedKVCache* kv_cache, 
io::IOContext* io_ctx,
+                     FileMetaCache* meta_cache)
             : IcebergTableReader(std::move(file_format_reader), profile, 
state, params, range,
-                                 kv_cache, io_ctx) {}
+                                 kv_cache, io_ctx, meta_cache) {}
 
     void set_delete_rows() final {
         auto* orc_reader = (OrcReader*)_file_format_reader.get();
@@ -223,7 +225,8 @@ protected:
     std::unique_ptr<GenericReader> _create_equality_reader(
             const TFileRangeDesc& delete_desc) override {
         return OrcReader::create_unique(_profile, _state, _params, delete_desc,
-                                        READ_DELETE_FILE_BATCH_SIZE, 
_state->timezone(), _io_ctx);
+                                        READ_DELETE_FILE_BATCH_SIZE, 
_state->timezone(), _io_ctx,
+                                        _meta_cache);
     }
 
 private:
diff --git a/be/src/vec/exec/format/table/paimon_reader.cpp 
b/be/src/vec/exec/format/table/paimon_reader.cpp
index 5a84a22863c..d5bb048ebf7 100644
--- a/be/src/vec/exec/format/table/paimon_reader.cpp
+++ b/be/src/vec/exec/format/table/paimon_reader.cpp
@@ -28,8 +28,9 @@ namespace doris::vectorized {
 PaimonReader::PaimonReader(std::unique_ptr<GenericReader> file_format_reader,
                            RuntimeProfile* profile, RuntimeState* state,
                            const TFileScanRangeParams& params, const 
TFileRangeDesc& range,
-                           io::IOContext* io_ctx)
-        : TableFormatReader(std::move(file_format_reader), state, profile, 
params, range, io_ctx) {
+                           io::IOContext* io_ctx, FileMetaCache* meta_cache)
+        : TableFormatReader(std::move(file_format_reader), state, profile, 
params, range, io_ctx,
+                            meta_cache) {
     static const char* paimon_profile = "PaimonProfile";
     ADD_TIMER(_profile, paimon_profile);
     _paimon_profile.num_delete_rows =
diff --git a/be/src/vec/exec/format/table/paimon_reader.h 
b/be/src/vec/exec/format/table/paimon_reader.h
index eb6d909bac5..2bb8e105be5 100644
--- a/be/src/vec/exec/format/table/paimon_reader.h
+++ b/be/src/vec/exec/format/table/paimon_reader.h
@@ -30,7 +30,7 @@ class PaimonReader : public TableFormatReader, public 
TableSchemaChangeHelper {
 public:
     PaimonReader(std::unique_ptr<GenericReader> file_format_reader, 
RuntimeProfile* profile,
                  RuntimeState* state, const TFileScanRangeParams& params,
-                 const TFileRangeDesc& range, io::IOContext* io_ctx);
+                 const TFileRangeDesc& range, io::IOContext* io_ctx, 
FileMetaCache* meta_cache);
 
     ~PaimonReader() override = default;
 
@@ -54,8 +54,9 @@ public:
     ENABLE_FACTORY_CREATOR(PaimonOrcReader);
     PaimonOrcReader(std::unique_ptr<GenericReader> file_format_reader, 
RuntimeProfile* profile,
                     RuntimeState* state, const TFileScanRangeParams& params,
-                    const TFileRangeDesc& range, io::IOContext* io_ctx)
-            : PaimonReader(std::move(file_format_reader), profile, state, 
params, range, io_ctx) {};
+                    const TFileRangeDesc& range, io::IOContext* io_ctx, 
FileMetaCache* meta_cache)
+            : PaimonReader(std::move(file_format_reader), profile, state, 
params, range, io_ctx,
+                           meta_cache) {};
     ~PaimonOrcReader() final = default;
 
     void set_delete_rows() final {
@@ -90,8 +91,10 @@ public:
     ENABLE_FACTORY_CREATOR(PaimonParquetReader);
     PaimonParquetReader(std::unique_ptr<GenericReader> file_format_reader, 
RuntimeProfile* profile,
                         RuntimeState* state, const TFileScanRangeParams& 
params,
-                        const TFileRangeDesc& range, io::IOContext* io_ctx)
-            : PaimonReader(std::move(file_format_reader), profile, state, 
params, range, io_ctx) {};
+                        const TFileRangeDesc& range, io::IOContext* io_ctx,
+                        FileMetaCache* meta_cache)
+            : PaimonReader(std::move(file_format_reader), profile, state, 
params, range, io_ctx,
+                           meta_cache) {};
     ~PaimonParquetReader() final = default;
 
     void set_delete_rows() final {
diff --git a/be/src/vec/exec/format/table/table_format_reader.cpp 
b/be/src/vec/exec/format/table/table_format_reader.cpp
index 3bb2957981b..6ecad595f7b 100644
--- a/be/src/vec/exec/format/table/table_format_reader.cpp
+++ b/be/src/vec/exec/format/table/table_format_reader.cpp
@@ -38,7 +38,8 @@ const Status 
TableSchemaChangeHelper::BuildTableInfoUtil::SCHEMA_ERROR = Status:
 
 Status TableSchemaChangeHelper::BuildTableInfoUtil::by_parquet_name(
         const TupleDescriptor* table_tuple_descriptor, const FieldDescriptor& 
parquet_field_desc,
-        std::shared_ptr<TableSchemaChangeHelper::Node>& node) {
+        std::shared_ptr<TableSchemaChangeHelper::Node>& node,
+        const std::set<TSlotId>* is_file_slot) {
     auto struct_node = std::make_shared<TableSchemaChangeHelper::StructNode>();
     auto parquet_fields_schema = parquet_field_desc.get_fields_schema();
     std::map<std::string, size_t> file_column_name_idx_map;
@@ -48,8 +49,9 @@ Status 
TableSchemaChangeHelper::BuildTableInfoUtil::by_parquet_name(
 
     for (const auto& slot : table_tuple_descriptor->slots()) {
         const auto& table_column_name = slot->col_name();
-
-        if (file_column_name_idx_map.contains(table_column_name)) {
+        // https://github.com/apache/doris/pull/23369/files
+        if ((is_file_slot == nullptr || is_file_slot->contains(slot->id())) &&
+            file_column_name_idx_map.contains(table_column_name)) {
             auto file_column_idx = file_column_name_idx_map[table_column_name];
             std::shared_ptr<TableSchemaChangeHelper::Node> field_node = 
nullptr;
             RETURN_IF_ERROR(by_parquet_name(slot->get_data_type_ptr(),
@@ -159,7 +161,8 @@ Status 
TableSchemaChangeHelper::BuildTableInfoUtil::by_parquet_name(
 
 Status TableSchemaChangeHelper::BuildTableInfoUtil::by_orc_name(
         const TupleDescriptor* table_tuple_descriptor, const orc::Type* 
orc_type_ptr,
-        std::shared_ptr<TableSchemaChangeHelper::Node>& node) {
+        std::shared_ptr<TableSchemaChangeHelper::Node>& node,
+        const std::set<TSlotId>* is_file_slot) {
     auto struct_node = std::make_shared<TableSchemaChangeHelper::StructNode>();
 
     std::map<std::string, uint64_t> file_column_name_idx_map;
@@ -170,7 +173,8 @@ Status 
TableSchemaChangeHelper::BuildTableInfoUtil::by_orc_name(
 
     for (const auto& slot : table_tuple_descriptor->slots()) {
         const auto& table_column_name = slot->col_name();
-        if (file_column_name_idx_map.contains(table_column_name)) {
+        if ((is_file_slot == nullptr || is_file_slot->contains(slot->id())) &&
+            file_column_name_idx_map.contains(table_column_name)) {
             auto file_column_idx = file_column_name_idx_map[table_column_name];
             std::shared_ptr<TableSchemaChangeHelper::Node> field_node = 
nullptr;
             RETURN_IF_ERROR(by_orc_name(slot->get_data_type_ptr(),
diff --git a/be/src/vec/exec/format/table/table_format_reader.h 
b/be/src/vec/exec/format/table/table_format_reader.h
index 1f79de54ed4..86f30ecb169 100644
--- a/be/src/vec/exec/format/table/table_format_reader.h
+++ b/be/src/vec/exec/format/table/table_format_reader.h
@@ -47,13 +47,14 @@ class TableFormatReader : public GenericReader {
 public:
     TableFormatReader(std::unique_ptr<GenericReader> file_format_reader, 
RuntimeState* state,
                       RuntimeProfile* profile, const TFileScanRangeParams& 
params,
-                      const TFileRangeDesc& range, io::IOContext* io_ctx)
+                      const TFileRangeDesc& range, io::IOContext* io_ctx, 
FileMetaCache* meta_cache)
             : _file_format_reader(std::move(file_format_reader)),
               _state(state),
               _profile(profile),
               _params(params),
               _range(range),
               _io_ctx(io_ctx) {
+        _meta_cache = meta_cache;
         if (range.table_format_params.__isset.table_level_row_count) {
             _table_level_row_count = 
range.table_format_params.table_level_row_count;
         } else {
@@ -330,7 +331,8 @@ public:
         // for hive parquet : The table column names passed from fe are 
lowercase, so use lowercase file column names to match table column names.
         static Status by_parquet_name(const TupleDescriptor* 
table_tuple_descriptor,
                                       const FieldDescriptor& 
parquet_field_desc,
-                                      
std::shared_ptr<TableSchemaChangeHelper::Node>& node);
+                                      
std::shared_ptr<TableSchemaChangeHelper::Node>& node,
+                                      const std::set<TSlotId>* is_file_slot = 
nullptr);
 
         // for hive parquet
         static Status by_parquet_name(const DataTypePtr& table_data_type,
@@ -340,7 +342,8 @@ public:
         // for hive orc: The table column names passed from fe are lowercase, 
so use lowercase file column names to match table column names.
         static Status by_orc_name(const TupleDescriptor* 
table_tuple_descriptor,
                                   const orc::Type* orc_type_ptr,
-                                  
std::shared_ptr<TableSchemaChangeHelper::Node>& node);
+                                  
std::shared_ptr<TableSchemaChangeHelper::Node>& node,
+                                  const std::set<TSlotId>* is_file_slot = 
nullptr);
         // for hive orc
         static Status by_orc_name(const DataTypePtr& table_data_type, const 
orc::Type* orc_root,
                                   
std::shared_ptr<TableSchemaChangeHelper::Node>& node);
diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.cpp 
b/be/src/vec/exec/format/table/transactional_hive_reader.cpp
index 79e611fe066..87b3f9c2dfb 100644
--- a/be/src/vec/exec/format/table/transactional_hive_reader.cpp
+++ b/be/src/vec/exec/format/table/transactional_hive_reader.cpp
@@ -38,8 +38,10 @@ namespace doris::vectorized {
 
TransactionalHiveReader::TransactionalHiveReader(std::unique_ptr<GenericReader> 
file_format_reader,
                                                  RuntimeProfile* profile, 
RuntimeState* state,
                                                  const TFileScanRangeParams& 
params,
-                                                 const TFileRangeDesc& range, 
io::IOContext* io_ctx)
-        : TableFormatReader(std::move(file_format_reader), state, profile, 
params, range, io_ctx) {
+                                                 const TFileRangeDesc& range, 
io::IOContext* io_ctx,
+                                                 FileMetaCache* meta_cache)
+        : TableFormatReader(std::move(file_format_reader), state, profile, 
params, range, io_ctx,
+                            meta_cache) {
     static const char* transactional_hive_profile = "TransactionalHiveProfile";
     ADD_TIMER(_profile, transactional_hive_profile);
     _transactional_orc_profile.num_delete_files =
@@ -163,7 +165,7 @@ Status TransactionalHiveReader::init_row_filters() {
         delete_range.file_size = -1;
 
         OrcReader delete_reader(_profile, _state, _params, delete_range, 
_MIN_BATCH_SIZE,
-                                _state->timezone(), _io_ctx, false);
+                                _state->timezone(), _io_ctx, _meta_cache, 
false);
 
         auto acid_info_node = std::make_shared<StructNode>();
         for (auto idx = 0; idx < 
TransactionalHive::DELETE_ROW_COLUMN_NAMES_LOWER_CASE.size();
diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.h 
b/be/src/vec/exec/format/table/transactional_hive_reader.h
index 60114bbb29c..e8f43210396 100644
--- a/be/src/vec/exec/format/table/transactional_hive_reader.h
+++ b/be/src/vec/exec/format/table/transactional_hive_reader.h
@@ -83,7 +83,7 @@ public:
     TransactionalHiveReader(std::unique_ptr<GenericReader> file_format_reader,
                             RuntimeProfile* profile, RuntimeState* state,
                             const TFileScanRangeParams& params, const 
TFileRangeDesc& range,
-                            io::IOContext* io_ctx);
+                            io::IOContext* io_ctx, FileMetaCache* meta_cache);
     ~TransactionalHiveReader() override = default;
 
     Status init_row_filters() final;
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 6e197ccee46..3665e52270f 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -1006,12 +1006,14 @@ Status VFileScanner::_get_next_reader() {
             break;
         }
         case TFileFormatType::FORMAT_PARQUET: {
+            auto file_meta_cache_ptr = _should_enable_file_meta_cache()
+                                               ? 
ExecEnv::GetInstance()->file_meta_cache()
+                                               : nullptr;
             std::unique_ptr<ParquetReader> parquet_reader = 
ParquetReader::create_unique(
                     _profile, *_params, range, 
_state->query_options().batch_size,
                     const_cast<cctz::time_zone*>(&_state->timezone_obj()), 
_io_ctx.get(), _state,
-                    _should_enable_file_meta_cache() ? 
ExecEnv::GetInstance()->file_meta_cache()
-                                                     : nullptr,
-                    _state->query_options().enable_parquet_lazy_mat);
+                    file_meta_cache_ptr, 
_state->query_options().enable_parquet_lazy_mat);
+
             // ATTN: the push down agg type may be set back to NONE,
             // see IcebergTableReader::init_row_filters for example.
             parquet_reader->set_push_down_agg_type(_get_push_down_agg_type());
@@ -1023,7 +1025,7 @@ Status VFileScanner::_get_next_reader() {
                 std::unique_ptr<IcebergParquetReader> iceberg_reader =
                         
IcebergParquetReader::create_unique(std::move(parquet_reader), _profile,
                                                             _state, *_params, 
range, _kv_cache,
-                                                            _io_ctx.get());
+                                                            _io_ctx.get(), 
file_meta_cache_ptr);
                 init_status = iceberg_reader->init_reader(
                         _file_col_names, _colname_to_value_range, 
_push_down_conjuncts,
                         _real_tuple_desc, _default_val_row_desc.get(), 
_col_name_to_slot_id,
@@ -1033,7 +1035,8 @@ Status VFileScanner::_get_next_reader() {
                        range.table_format_params.table_format_type == 
"paimon") {
                 std::unique_ptr<PaimonParquetReader> paimon_reader =
                         
PaimonParquetReader::create_unique(std::move(parquet_reader), _profile,
-                                                           _state, *_params, 
range, _io_ctx.get());
+                                                           _state, *_params, 
range, _io_ctx.get(),
+                                                           
file_meta_cache_ptr);
                 init_status = paimon_reader->init_reader(
                         _file_col_names, _colname_to_value_range, 
_push_down_conjuncts,
                         _real_tuple_desc, _default_val_row_desc.get(), 
_col_name_to_slot_id,
@@ -1042,18 +1045,19 @@ Status VFileScanner::_get_next_reader() {
                 _cur_reader = std::move(paimon_reader);
             } else if (range.__isset.table_format_params &&
                        range.table_format_params.table_format_type == "hudi") {
-                std::unique_ptr<HudiParquetReader> hudi_reader =
-                        
HudiParquetReader::create_unique(std::move(parquet_reader), _profile,
-                                                         _state, *_params, 
range, _io_ctx.get());
+                std::unique_ptr<HudiParquetReader> hudi_reader = 
HudiParquetReader::create_unique(
+                        std::move(parquet_reader), _profile, _state, *_params, 
range, _io_ctx.get(),
+                        file_meta_cache_ptr);
                 init_status = hudi_reader->init_reader(
                         _file_col_names, _colname_to_value_range, 
_push_down_conjuncts,
                         _real_tuple_desc, _default_val_row_desc.get(), 
_col_name_to_slot_id,
                         &_not_single_slot_filter_conjuncts, 
&_slot_id_to_filter_conjuncts);
                 _cur_reader = std::move(hudi_reader);
             } else if (range.table_format_params.table_format_type == "hive") {
-                auto hive_reader =
-                        
HiveParquetReader::create_unique(std::move(parquet_reader), _profile,
-                                                         _state, *_params, 
range, _io_ctx.get());
+                auto hive_reader = HiveParquetReader::create_unique(
+                        std::move(parquet_reader), _profile, _state, *_params, 
range, _io_ctx.get(),
+                        &_is_file_slot, file_meta_cache_ptr);
+
                 init_status = hive_reader->init_reader(
                         _file_col_names, _colname_to_value_range, 
_push_down_conjuncts,
                         _real_tuple_desc, _default_val_row_desc.get(), 
_col_name_to_slot_id,
@@ -1112,9 +1116,14 @@ Status VFileScanner::_get_next_reader() {
             break;
         }
         case TFileFormatType::FORMAT_ORC: {
+            auto file_meta_cache_ptr = _should_enable_file_meta_cache()
+                                               ? 
ExecEnv::GetInstance()->file_meta_cache()
+                                               : nullptr;
             std::unique_ptr<OrcReader> orc_reader = OrcReader::create_unique(
                     _profile, _state, *_params, range, 
_state->query_options().batch_size,
-                    _state->timezone(), _io_ctx.get(), 
_state->query_options().enable_orc_lazy_mat);
+                    _state->timezone(), _io_ctx.get(), file_meta_cache_ptr,
+                    _state->query_options().enable_orc_lazy_mat);
+
             orc_reader->set_push_down_agg_type(_get_push_down_agg_type());
             if (push_down_predicates) {
                 RETURN_IF_ERROR(_process_late_arrival_conjuncts());
@@ -1124,7 +1133,7 @@ Status VFileScanner::_get_next_reader() {
                 std::unique_ptr<TransactionalHiveReader> tran_orc_reader =
                         
TransactionalHiveReader::create_unique(std::move(orc_reader), _profile,
                                                                _state, 
*_params, range,
-                                                               _io_ctx.get());
+                                                               _io_ctx.get(), 
file_meta_cache_ptr);
                 init_status = tran_orc_reader->init_reader(
                         _file_col_names, _colname_to_value_range, 
_push_down_conjuncts,
                         _real_tuple_desc, _default_val_row_desc.get(),
@@ -1133,9 +1142,9 @@ Status VFileScanner::_get_next_reader() {
                 _cur_reader = std::move(tran_orc_reader);
             } else if (range.__isset.table_format_params &&
                        range.table_format_params.table_format_type == 
"iceberg") {
-                std::unique_ptr<IcebergOrcReader> iceberg_reader =
-                        IcebergOrcReader::create_unique(std::move(orc_reader), 
_profile, _state,
-                                                        *_params, range, 
_kv_cache, _io_ctx.get());
+                std::unique_ptr<IcebergOrcReader> iceberg_reader = 
IcebergOrcReader::create_unique(
+                        std::move(orc_reader), _profile, _state, *_params, 
range, _kv_cache,
+                        _io_ctx.get(), file_meta_cache_ptr);
 
                 init_status = iceberg_reader->init_reader(
                         _file_col_names, _colname_to_value_range, 
_push_down_conjuncts,
@@ -1145,7 +1154,8 @@ Status VFileScanner::_get_next_reader() {
             } else if (range.__isset.table_format_params &&
                        range.table_format_params.table_format_type == 
"paimon") {
                 std::unique_ptr<PaimonOrcReader> paimon_reader = 
PaimonOrcReader::create_unique(
-                        std::move(orc_reader), _profile, _state, *_params, 
range, _io_ctx.get());
+                        std::move(orc_reader), _profile, _state, *_params, 
range, _io_ctx.get(),
+                        file_meta_cache_ptr);
 
                 init_status = paimon_reader->init_reader(
                         _file_col_names, _colname_to_value_range, 
_push_down_conjuncts,
@@ -1156,7 +1166,8 @@ Status VFileScanner::_get_next_reader() {
             } else if (range.__isset.table_format_params &&
                        range.table_format_params.table_format_type == "hudi") {
                 std::unique_ptr<HudiOrcReader> hudi_reader = 
HudiOrcReader::create_unique(
-                        std::move(orc_reader), _profile, _state, *_params, 
range, _io_ctx.get());
+                        std::move(orc_reader), _profile, _state, *_params, 
range, _io_ctx.get(),
+                        file_meta_cache_ptr);
 
                 init_status = hudi_reader->init_reader(
                         _file_col_names, _colname_to_value_range, 
_push_down_conjuncts,
@@ -1166,8 +1177,8 @@ Status VFileScanner::_get_next_reader() {
             } else if (range.__isset.table_format_params &&
                        range.table_format_params.table_format_type == "hive") {
                 std::unique_ptr<HiveOrcReader> hive_reader = 
HiveOrcReader::create_unique(
-                        std::move(orc_reader), _profile, _state, *_params, 
range, _io_ctx.get());
-
+                        std::move(orc_reader), _profile, _state, *_params, 
range, _io_ctx.get(),
+                        &_is_file_slot, file_meta_cache_ptr);
                 init_status = hive_reader->init_reader(
                         _file_col_names, _colname_to_value_range, 
_push_down_conjuncts,
                         _real_tuple_desc, _default_val_row_desc.get(),
@@ -1428,6 +1439,7 @@ Status VFileScanner::_init_expr_ctxes() {
                     fmt::format("Unknown source slot descriptor, slot_id={}", 
slot_id));
         }
         if (slot_info.is_file_slot) {
+            _is_file_slot.emplace(slot_id);
             _file_slot_descs.emplace_back(it->second);
             _file_col_names.push_back(it->second->col_name());
         }
diff --git a/be/src/vec/exec/scan/vfile_scanner.h 
b/be/src/vec/exec/scan/vfile_scanner.h
index 7e58d258d46..437a87a6816 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -30,6 +30,7 @@
 #include "common/global_types.h"
 #include "common/status.h"
 #include "exec/olap_common.h"
+#include "io/fs/file_meta_cache.h"
 #include "io/io_common.h"
 #include "pipeline/exec/file_scan_operator.h"
 #include "runtime/descriptors.h"
@@ -151,6 +152,7 @@ protected:
     // owned by scan node
     ShardedKVCache* _kv_cache = nullptr;
 
+    std::set<TSlotId> _is_file_slot;
     bool _scanner_eof = false;
     int _rows = 0;
     int _num_of_columns_from_file;
@@ -247,7 +249,7 @@ private:
     // 2. the file number is less than 1/3 of cache's capacibility
     // Otherwise, the cache miss rate will be high
     bool _should_enable_file_meta_cache() {
-        return config::max_external_file_meta_cache_num > 0 &&
+        return ExecEnv::GetInstance()->file_meta_cache()->enabled() &&
                _split_source->num_scan_ranges() < 
config::max_external_file_meta_cache_num / 3;
     }
 };
diff --git a/be/test/vec/exec/format/file_reader/file_meta_cache_test.cpp 
b/be/test/vec/exec/format/file_reader/file_meta_cache_test.cpp
new file mode 100644
index 00000000000..3aef8db8459
--- /dev/null
+++ b/be/test/vec/exec/format/file_reader/file_meta_cache_test.cpp
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/fs/file_meta_cache.h"
+
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+#include "io/fs/file_reader.h"
+
+namespace doris {
+
+class MockFileReader : public io::FileReader {
+public:
+    MockFileReader(const std::string& file_name, size_t size)
+            : _file_name(file_name), _size(size), _closed(false) {}
+    ~MockFileReader() override = default;
+
+    const io::Path& path() const override {
+        static io::Path p(_file_name);
+        return p;
+    }
+
+    size_t size() const override { return _size; }
+
+    bool closed() const override { return _closed; }
+
+    Status close() override {
+        _closed = true;
+        return Status::OK();
+    }
+
+protected:
+    Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
+                        const io::IOContext* io_ctx) override {
+        *bytes_read = 0;
+        return Status::OK();
+    }
+
+private:
+    std::string _file_name;
+    size_t _size;
+    bool _closed;
+};
+
+TEST(FileMetaCacheTest, KeyGenerationFromParams) {
+    std::string file_name = "/path/to/file";
+    int64_t mtime = 123456789;
+    int64_t file_size = 987654321;
+
+    std::string key1 = FileMetaCache::get_key(file_name, mtime, file_size);
+    std::string key2 = FileMetaCache::get_key(file_name, mtime, file_size);
+    EXPECT_EQ(key1, key2) << "Same parameters should produce same key";
+
+    // Different mtime should produce different key
+    std::string key3 = FileMetaCache::get_key(file_name, mtime + 1, file_size);
+    EXPECT_NE(key1, key3);
+
+    // mtime == 0, use file_size
+    std::string key4 = FileMetaCache::get_key(file_name, 0, file_size);
+    std::string key5 = FileMetaCache::get_key(file_name, 0, file_size);
+    EXPECT_EQ(key4, key5);
+    EXPECT_NE(key1, key4);
+
+    // mtime == 0, different file_size
+    std::string key6 = FileMetaCache::get_key(file_name, 0, file_size + 1);
+    EXPECT_NE(key4, key6);
+}
+
+TEST(FileMetaCacheTest, KeyGenerationFromFileReader) {
+    std::string file_name = "/path/to/file";
+    int64_t mtime = 123456789;
+    int64_t file_size = 100;
+
+    // file_description.file_size != -1, use it as file size
+    io::FileDescription desc1;
+    desc1.mtime = mtime;
+    desc1.file_size = file_size;
+    auto reader1 = std::make_shared<MockFileReader>(file_name, 200);
+
+    std::string key1 = FileMetaCache::get_key(reader1, desc1);
+    std::string expected_key1 = FileMetaCache::get_key(file_name, mtime, 
file_size);
+    EXPECT_EQ(key1, expected_key1);
+
+    // file_description.file_size == -1, use reader->size()
+    io::FileDescription desc2;
+    desc2.mtime = 0;
+    desc2.file_size = -1;
+    auto reader2 = std::make_shared<MockFileReader>(file_name, 300);
+
+    std::string key2 = FileMetaCache::get_key(reader2, desc2);
+    std::string expected_key2 = FileMetaCache::get_key(file_name, 0, 300);
+    EXPECT_EQ(key2, expected_key2);
+}
+TEST(FileMetaCacheTest, KeyContentVerification) {
+    std::string file_name = "/path/to/file";
+    int64_t mtime = 0x0102030405060708;
+    int64_t file_size = 0x1112131415161718;
+
+    std::string key_with_mtime = FileMetaCache::get_key(file_name, mtime, 
file_size);
+
+    ASSERT_EQ(key_with_mtime.size(), file_name.size() + sizeof(int64_t));
+
+    EXPECT_EQ(memcmp(key_with_mtime.data(), file_name.data(), 
file_name.size()), 0);
+
+    int64_t extracted_mtime = 0;
+    memcpy(&extracted_mtime, key_with_mtime.data() + file_name.size(), 
sizeof(int64_t));
+    EXPECT_EQ(extracted_mtime, mtime);
+
+    std::string key_with_filesize = FileMetaCache::get_key(file_name, 0, 
file_size);
+    ASSERT_EQ(key_with_filesize.size(), file_name.size() + sizeof(int64_t));
+    EXPECT_EQ(memcmp(key_with_filesize.data(), file_name.data(), 
file_name.size()), 0);
+    int64_t extracted_filesize = 0;
+    memcpy(&extracted_filesize, key_with_filesize.data() + file_name.size(), 
sizeof(int64_t));
+    EXPECT_EQ(extracted_filesize, file_size);
+}
+
+TEST(FileMetaCacheTest, InsertAndLookupWithIntValue) {
+    FileMetaCache cache(1024 * 1024);
+
+    int* value = new int(12345);
+    ObjLRUCache::CacheHandle handle;
+
+    cache.insert("key_int", value, &handle);
+    ASSERT_NE(handle._cache, nullptr);
+
+    const int* cached_val = handle.data<int>();
+    ASSERT_NE(cached_val, nullptr);
+    EXPECT_EQ(*cached_val, 12345);
+
+    ObjLRUCache::CacheHandle handle2;
+    cache.lookup("key_int", &handle2);
+
+    ASSERT_NE(handle2._cache, nullptr);
+
+    const int* cached_val2 = handle2.data<int>();
+    ASSERT_NE(cached_val2, nullptr);
+    EXPECT_EQ(*cached_val2, 12345);
+}
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/test/vec/exec/format/parquet/parquet_reader_test.cpp 
b/be/test/vec/exec/format/parquet/parquet_reader_test.cpp
index 9a0c9c22c35..feb922c8f0b 100644
--- a/be/test/vec/exec/format/parquet/parquet_reader_test.cpp
+++ b/be/test/vec/exec/format/parquet/parquet_reader_test.cpp
@@ -34,6 +34,7 @@
 #include "common/object_pool.h"
 #include "exec/olap_common.h"
 #include "gtest/gtest_pred_impl.h"
+#include "io/fs/file_meta_cache.h"
 #include "io/fs/file_reader_writer_fwd.h"
 #include "io/fs/file_system.h"
 #include "io/fs/local_file_system.h"
@@ -53,7 +54,9 @@ class VExprContext;
 
 class ParquetReaderTest : public testing::Test {
 public:
-    ParquetReaderTest() {}
+    ParquetReaderTest() : cache(1024) {}
+
+    FileMetaCache cache;
 };
 
 static void create_table_desc(TDescriptorTable& t_desc_table, 
TTableDescriptor& t_table_desc,
@@ -142,8 +145,8 @@ TEST_F(ParquetReaderTest, normal) {
         scan_range.start_offset = 0;
         scan_range.size = 1000;
     }
-    auto p_reader =
-            new ParquetReader(nullptr, scan_params, scan_range, 992, &ctz, 
nullptr, nullptr);
+    auto p_reader = new ParquetReader(nullptr, scan_params, scan_range, 992, 
&ctz, nullptr, nullptr,
+                                      &cache);
     p_reader->set_file_reader(reader);
     RuntimeState runtime_state((TQueryGlobals()));
     runtime_state.set_desc_tbl(desc_tbl);
diff --git a/be/test/vec/exec/format/parquet/parquet_thrift_test.cpp 
b/be/test/vec/exec/format/parquet/parquet_thrift_test.cpp
index 66b13dd7445..08c541fca5e 100644
--- a/be/test/vec/exec/format/parquet/parquet_thrift_test.cpp
+++ b/be/test/vec/exec/format/parquet/parquet_thrift_test.cpp
@@ -76,7 +76,7 @@ TEST_F(ParquetThriftReaderTest, normal) {
                                   &reader);
     EXPECT_TRUE(st.ok());
 
-    FileMetaData* meta_data;
+    std::unique_ptr<FileMetaData> meta_data;
     size_t meta_size;
     static_cast<void>(parse_thrift_footer(reader, &meta_data, &meta_size, 
nullptr));
     tparquet::FileMetaData t_metadata = meta_data->to_thrift();
@@ -92,7 +92,6 @@ TEST_F(ParquetThriftReaderTest, normal) {
         LOG(WARNING) << "schema column repetition_type: " << 
value.repetition_type;
         LOG(WARNING) << "schema column num children: " << value.num_children;
     }
-    delete meta_data;
 }
 
 TEST_F(ParquetThriftReaderTest, complex_nested_file) {
@@ -110,7 +109,7 @@ TEST_F(ParquetThriftReaderTest, complex_nested_file) {
                                   &reader);
     EXPECT_TRUE(st.ok());
 
-    FileMetaData* metadata;
+    std::unique_ptr<FileMetaData> metadata;
     size_t meta_size;
     static_cast<void>(parse_thrift_footer(reader, &metadata, &meta_size, 
nullptr));
     tparquet::FileMetaData t_metadata = metadata->to_thrift();
@@ -157,7 +156,6 @@ TEST_F(ParquetThriftReaderTest, complex_nested_file) {
 
     ASSERT_EQ(schemaDescriptor.get_column_index("friend"), 3);
     ASSERT_EQ(schemaDescriptor.get_column_index("mark"), 4);
-    delete metadata;
 }
 
 static int fill_nullable_column(ColumnPtr& doris_column, level_t* definitions, 
size_t num_values) {
@@ -399,7 +397,8 @@ static void read_parquet_data_and_check(const std::string& 
parquet_file,
 
     std::unique_ptr<vectorized::Block> block;
     create_block(block);
-    FileMetaData* metadata;
+
+    std::unique_ptr<FileMetaData> metadata;
     size_t meta_size;
     static_cast<void>(parse_thrift_footer(reader, &metadata, &meta_size, 
nullptr));
     tparquet::FileMetaData t_metadata = metadata->to_thrift();
@@ -446,7 +445,6 @@ static void read_parquet_data_and_check(const std::string& 
parquet_file,
     Slice res(result_buf.data(), result->size());
     static_cast<void>(result->read_at(0, res, &bytes_read));
     ASSERT_STREQ(block->dump_data(0, rows).c_str(), 
reinterpret_cast<char*>(result_buf.data()));
-    delete metadata;
 }
 
 TEST_F(ParquetThriftReaderTest, type_decoder) {
diff --git a/be/test/vec/exec/orc/orc_convert_dict_test.cpp 
b/be/test/vec/exec/orc/orc_convert_dict_test.cpp
index bce08cc63db..0aea8438048 100644
--- a/be/test/vec/exec/orc/orc_convert_dict_test.cpp
+++ b/be/test/vec/exec/orc/orc_convert_dict_test.cpp
@@ -82,7 +82,7 @@ TEST_F(OrcReaderConvertDictTest, 
ConvertDictColumnToStringColumnBasic) {
 
     TFileScanRangeParams params;
     TFileRangeDesc range;
-    auto reader = OrcReader::create_unique(params, range, "", nullptr, true);
+    auto reader = OrcReader::create_unique(params, range, "", nullptr, 
nullptr, true);
 
     // Execute conversion
     auto result_column = reader->_convert_dict_column_to_string_column(
@@ -119,7 +119,7 @@ TEST_F(OrcReaderConvertDictTest, 
ConvertDictColumnToStringColumnWithNulls) {
 
     TFileScanRangeParams params;
     TFileRangeDesc range;
-    auto _reader = OrcReader::create_unique(params, range, "", nullptr, true);
+    auto _reader = OrcReader::create_unique(params, range, "", nullptr, 
nullptr, true);
 
     // Execute conversion
     auto result_column = _reader->_convert_dict_column_to_string_column(
@@ -151,7 +151,7 @@ TEST_F(OrcReaderConvertDictTest, 
ConvertDictColumnToStringColumnChar) {
     auto orc_type_ptr = createPrimitiveType(orc::TypeKind::CHAR);
     TFileScanRangeParams params;
     TFileRangeDesc range;
-    auto _reader = OrcReader::create_unique(params, range, "", nullptr, true);
+    auto _reader = OrcReader::create_unique(params, range, "", nullptr, 
nullptr, true);
 
     // Execute conversion
     auto result_column = _reader->_convert_dict_column_to_string_column(
@@ -182,7 +182,7 @@ TEST_F(OrcReaderConvertDictTest, 
ConvertDictColumnToStringColumnEmpty) {
     auto orc_type_ptr = createPrimitiveType(orc::TypeKind::STRING);
     TFileScanRangeParams params;
     TFileRangeDesc range;
-    auto _reader = OrcReader::create_unique(params, range, "", nullptr, true);
+    auto _reader = OrcReader::create_unique(params, range, "", nullptr, 
nullptr, true);
     // Execute conversion
     auto result_column = _reader->_convert_dict_column_to_string_column(
             dict_column.get(), nullptr, string_batch.get(), 
orc_type_ptr.get());
@@ -214,7 +214,7 @@ TEST_F(OrcReaderConvertDictTest, 
ConvertDictColumnToStringColumnMixed) {
     auto orc_type_ptr = createPrimitiveType(orc::TypeKind::STRING);
     TFileScanRangeParams params;
     TFileRangeDesc range;
-    auto _reader = OrcReader::create_unique(params, range, "", nullptr, true);
+    auto _reader = OrcReader::create_unique(params, range, "", nullptr, 
nullptr, true);
     // Execute conversion
     auto result_column = _reader->_convert_dict_column_to_string_column(
             dict_column.get(), &null_map, string_batch.get(), 
orc_type_ptr.get());
diff --git a/be/test/vec/exec/orc/orc_reader_fill_data_test.cpp 
b/be/test/vec/exec/orc/orc_reader_fill_data_test.cpp
index dcdc07349b3..0c9e5a29eae 100644
--- a/be/test/vec/exec/orc/orc_reader_fill_data_test.cpp
+++ b/be/test/vec/exec/orc/orc_reader_fill_data_test.cpp
@@ -81,7 +81,7 @@ TEST_F(OrcReaderFillDataTest, TestFillLongColumn) {
 
     TFileScanRangeParams params;
     TFileRangeDesc range;
-    auto reader = OrcReader::create_unique(params, range, "", nullptr, true);
+    auto reader = OrcReader::create_unique(params, range, "", nullptr, 
nullptr, true);
 
     MutableColumnPtr xx = column->assume_mutable();
 
@@ -107,7 +107,7 @@ TEST_F(OrcReaderFillDataTest, TestFillLongColumnWithNull) {
 
     TFileScanRangeParams params;
     TFileRangeDesc range;
-    auto reader = OrcReader::create_unique(params, range, "", nullptr, true);
+    auto reader = OrcReader::create_unique(params, range, "", nullptr, 
nullptr, true);
 
     MutableColumnPtr xx = column->assume_mutable();
 
@@ -161,7 +161,7 @@ TEST_F(OrcReaderFillDataTest, ComplexTypeConversionTest) {
 
         TFileScanRangeParams params;
         TFileRangeDesc range;
-        auto reader = OrcReader::create_unique(params, range, "", nullptr, 
true);
+        auto reader = OrcReader::create_unique(params, range, "", nullptr, 
nullptr, true);
 
         auto doris_struct_type = std::make_shared<DataTypeStruct>(
                 std::vector<DataTypePtr> {
@@ -247,7 +247,7 @@ TEST_F(OrcReaderFillDataTest, ComplexTypeConversionTest) {
 
         TFileScanRangeParams params;
         TFileRangeDesc range;
-        auto reader = OrcReader::create_unique(params, range, "", nullptr, 
true);
+        auto reader = OrcReader::create_unique(params, range, "", nullptr, 
nullptr, true);
 
         auto doris_struct_type = std::make_shared<DataTypeStruct>(
                 std::vector<DataTypePtr> {std::make_shared<DataTypeInt32>(),
@@ -333,7 +333,7 @@ TEST_F(OrcReaderFillDataTest, ComplexTypeConversionTest) {
 
         TFileScanRangeParams params;
         TFileRangeDesc range;
-        auto reader = OrcReader::create_unique(params, range, "", nullptr, 
true);
+        auto reader = OrcReader::create_unique(params, range, "", nullptr, 
nullptr, true);
 
         auto doris_struct_type = std::make_shared<DataTypeStruct>(
                 std::vector<DataTypePtr> 
{std::make_shared<DataTypeDecimal<Decimal64>>(18, 5)},
@@ -447,7 +447,7 @@ TEST_F(OrcReaderFillDataTest, ComplexTypeConversionTest) {
 
         TFileScanRangeParams params;
         TFileRangeDesc range;
-        auto reader = OrcReader::create_unique(params, range, "", nullptr, 
true);
+        auto reader = OrcReader::create_unique(params, range, "", nullptr, 
nullptr, true);
 
         auto doris_struct_type = 
std::make_shared<DataTypeMap>(std::make_shared<DataTypeInt32>(),
                                                                
std::make_shared<DataTypeFloat32>());
diff --git a/be/test/vec/exec/orc/orc_reader_init_column_test.cpp 
b/be/test/vec/exec/orc/orc_reader_init_column_test.cpp
index 841b7fb813e..c4c2ca51a38 100644
--- a/be/test/vec/exec/orc/orc_reader_init_column_test.cpp
+++ b/be/test/vec/exec/orc/orc_reader_init_column_test.cpp
@@ -54,7 +54,7 @@ TEST_F(OrcReaderInitColumnTest, InitReadColumn) {
 
         TFileScanRangeParams params;
         TFileRangeDesc range;
-        auto reader = OrcReader::create_unique(params, range, "", nullptr, 
true);
+        auto reader = OrcReader::create_unique(params, range, "", nullptr, 
nullptr, true);
         reader->_reader = std::move(orc_reader);
         std::vector<std::string> tmp;
         tmp.emplace_back("col1");
@@ -73,7 +73,7 @@ TEST_F(OrcReaderInitColumnTest, CheckAcidSchemaTest) {
     using namespace orc;
     TFileScanRangeParams params;
     TFileRangeDesc range;
-    auto _reader = OrcReader::create_unique(params, range, "", nullptr, true);
+    auto _reader = OrcReader::create_unique(params, range, "", nullptr, 
nullptr, true);
     // 1. Test standard ACID schema
     {
         // Create standard ACID structure
@@ -140,7 +140,7 @@ TEST_F(OrcReaderInitColumnTest, RemoveAcidTest) {
     using namespace orc;
     TFileScanRangeParams params;
     TFileRangeDesc range;
-    auto _reader = OrcReader::create_unique(params, range, "", nullptr, true);
+    auto _reader = OrcReader::create_unique(params, range, "", nullptr, 
nullptr, true);
     // 1. Test removing ACID info from ACID schema
     {
         // Create ACID schema
diff --git a/be/test/vec/exec/orc_reader_test.cpp 
b/be/test/vec/exec/orc_reader_test.cpp
index ff7452ae625..45b15cd414e 100644
--- a/be/test/vec/exec/orc_reader_test.cpp
+++ b/be/test/vec/exec/orc_reader_test.cpp
@@ -23,6 +23,7 @@
 #include <tuple>
 #include <vector>
 
+#include "io/fs/file_meta_cache.h"
 #include "orc/sargs/SearchArgument.hh"
 #include "runtime/define_primitive_type.h"
 #include "runtime/exec_env.h"
@@ -36,9 +37,11 @@
 namespace doris::vectorized {
 class OrcReaderTest : public testing::Test {
 public:
-    OrcReaderTest() = default;
+    OrcReaderTest() : cache(1024) {}
     ~OrcReaderTest() override = default;
 
+    FileMetaCache cache;
+
 private:
     static constexpr const char* CANNOT_PUSH_DOWN_ERROR = "can't push down";
     std::string build_search_argument(const std::string& expr) {
@@ -65,7 +68,7 @@ private:
         range.path = "./be/test/exec/test_data/orc_scanner/orders.orc";
         range.start_offset = 0;
         range.size = 1293;
-        auto reader = OrcReader::create_unique(params, range, "", nullptr, 
true);
+        auto reader = OrcReader::create_unique(params, range, "", nullptr, 
&cache, true);
         auto status = reader->init_reader(&column_names, nullptr, {}, false, 
tuple_desc, &row_desc,
                                           nullptr, nullptr);
         EXPECT_TRUE(status.ok());
diff --git 
a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run19.sql
 
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run19.sql
index 8bd75b2809f..dbc0b857f44 100644
--- 
a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run19.sql
+++ 
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run19.sql
@@ -362,4 +362,44 @@ VALUES (1, 'z', 0.00),
     (7, 'n1', -1.23),
     (8, 'n20', -20.00),
     (9, 'big', 9999999.99),
-    (10, 'null', NULL);
\ No newline at end of file
+    (10, 'null', NULL);
+
+
+create database if not exists demo.test_db;
+use demo.test_db;
+
+
+CREATE TABLE test_invalid_avro_name_parquet (
+    id INT,
+    `TEST:A1B2.RAW.ABC-GG-1-A` STRING
+)
+USING iceberg
+TBLPROPERTIES(
+  'write.format.default' = 'parquet'
+);
+
+
+CREATE TABLE test_invalid_avro_name_orc (
+    id INT,
+    `TEST:A1B2.RAW.ABC-GG-1-A` STRING
+)
+USING iceberg
+TBLPROPERTIES(
+  'write.format.default' = 'orc'
+);
+
+INSERT INTO test_invalid_avro_name_parquet VALUES
+    (1, 'row1'),
+    (2, 'row2'),
+    (3, 'row3'),
+    (4, 'row4'),
+    (5, 'row5');
+
+INSERT INTO test_invalid_avro_name_orc VALUES
+    (1, 'row1'),
+    (2, 'row2'),
+    (3, 'row3'),
+    (4, 'row4'),
+    (5, 'row5');
+
+
diff --git 
a/regression-test/data/external_table_p0/hive/test_external_catalog_hive.out 
b/regression-test/data/external_table_p0/hive/test_external_catalog_hive.out
index a55a5bfee1a..ba84b42aa43 100644
Binary files 
a/regression-test/data/external_table_p0/hive/test_external_catalog_hive.out 
and 
b/regression-test/data/external_table_p0/hive/test_external_catalog_hive.out 
differ
diff --git 
a/regression-test/data/external_table_p0/hive/test_file_meta_cache.out 
b/regression-test/data/external_table_p0/hive/test_file_meta_cache.out
new file mode 100644
index 00000000000..0c003035956
Binary files /dev/null and 
b/regression-test/data/external_table_p0/hive/test_file_meta_cache.out differ
diff --git 
a/regression-test/data/external_table_p0/iceberg/test_iceberg_invaild_avro_name.out
 
b/regression-test/data/external_table_p0/iceberg/test_iceberg_invaild_avro_name.out
new file mode 100644
index 00000000000..1b8af743c6a
Binary files /dev/null and 
b/regression-test/data/external_table_p0/iceberg/test_iceberg_invaild_avro_name.out
 differ
diff --git 
a/regression-test/suites/external_table_p0/hive/test_external_catalog_hive.groovy
 
b/regression-test/suites/external_table_p0/hive/test_external_catalog_hive.groovy
index 16c8c1f6707..0f30fd0e504 100644
--- 
a/regression-test/suites/external_table_p0/hive/test_external_catalog_hive.groovy
+++ 
b/regression-test/suites/external_table_p0/hive/test_external_catalog_hive.groovy
@@ -108,6 +108,8 @@ suite("test_external_catalog_hive", 
"p0,external,hive,external_docker,external_d
         //qt_null_expr_dict_filter_orc """ select count(*), count(distinct 
user_no) from multi_catalog.dict_fitler_test_orc WHERE `partitions` in 
('2023-08-21') and actual_intf_type  =  'type1' and (REUSE_FLAG<> 'y' or 
REUSE_FLAG is null); """
         //qt_null_expr_dict_filter_parquet """ select count(*), count(distinct 
user_no) from multi_catalog.dict_fitler_test_parquet WHERE `partitions` in 
('2023-08-21') and actual_intf_type  =  'type1' and (REUSE_FLAG<> 'y' or 
REUSE_FLAG is null); """
 
+        sql """set  hive_orc_use_column_names = true """
+        sql """set  hive_parquet_use_column_names = true """
         // test par fields in file
         qt_par_fields_in_file_orc1 """ select * from 
multi_catalog.par_fields_in_file_orc where year = 2023 and month = 8 order by 
id; """
         qt_par_fields_in_file_parquet1 """ select * from 
multi_catalog.par_fields_in_file_parquet where year = 2023 and month = 8 order 
by id; """
@@ -120,6 +122,22 @@ suite("test_external_catalog_hive", 
"p0,external,hive,external_docker,external_d
         qt_par_fields_in_file_orc5 """ select * from 
multi_catalog.par_fields_in_file_orc where month = 8 and year = 2022 order by 
id; """
         qt_par_fields_in_file_parquet5 """ select * from 
multi_catalog.par_fields_in_file_parquet where month = 8 and year = 2022 order 
by id; """
 
+        sql """set  hive_orc_use_column_names = false; """
+        sql """set  hive_parquet_use_column_names = false"""
+        qt_par_fields_in_file_orc1 """ select * from 
multi_catalog.par_fields_in_file_orc where year = 2023 and month = 8 order by 
id; """
+        qt_par_fields_in_file_parquet1 """ select * from 
multi_catalog.par_fields_in_file_parquet where year = 2023 and month = 8 order 
by id; """
+        qt_par_fields_in_file_orc2 """ select * from 
multi_catalog.par_fields_in_file_orc where year = 2023 order by id; """
+        qt_par_fields_in_file_parquet2 """ select * from 
multi_catalog.par_fields_in_file_parquet where year = 2023 order by id; """
+        qt_par_fields_in_file_orc3 """ select * from 
multi_catalog.par_fields_in_file_orc where month = 8 order by id; """
+        qt_par_fields_in_file_parquet3 """ select * from 
multi_catalog.par_fields_in_file_parquet where month = 8 order by id; """
+        qt_par_fields_in_file_orc4 """ select * from 
multi_catalog.par_fields_in_file_orc where month = 8 and year >= 2022 order by 
id; """
+        qt_par_fields_in_file_parquet4 """ select * from 
multi_catalog.par_fields_in_file_parquet where month = 8 and year >= 2022 order 
by id; """
+        qt_par_fields_in_file_orc5 """ select * from 
multi_catalog.par_fields_in_file_orc where month = 8 and year = 2022 order by 
id; """
+        qt_par_fields_in_file_parquet5 """ select * from 
multi_catalog.par_fields_in_file_parquet where month = 8 and year = 2022 order 
by id; """
+
+
+        sql """set  hive_orc_use_column_names = true """
+        sql """set  hive_parquet_use_column_names = true """
         // timestamp with isAdjustedToUTC=true
         qt_parquet_adjusted_utc """select * from 
multi_catalog.timestamp_with_time_zone order by date_col;"""
 
diff --git 
a/regression-test/suites/external_table_p0/hive/test_file_meta_cache.groovy 
b/regression-test/suites/external_table_p0/hive/test_file_meta_cache.groovy
new file mode 100644
index 00000000000..6fc88e63d54
--- /dev/null
+++ b/regression-test/suites/external_table_p0/hive/test_file_meta_cache.groovy
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_file_meta_cache", 
"p0,external,hive,external_docker,external_docker_hive") {
+
+    String enabled = context.config.otherConfigs.get("enableHiveTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("diable Hive test.")
+        return;
+    }
+
+
+
+
+    for (String fileFormat : ["PARQUET",  "ORC"] ) {
+        for (String hivePrefix : ["hive2", "hive3"]) {
+            setHivePrefix(hivePrefix)
+            try {
+                String externalEnvIp = 
context.config.otherConfigs.get("externalEnvIp")
+                String hms_port = context.config.otherConfigs.get(hivePrefix + 
"HmsPort")
+
+                sql """ drop catalog if exists test_file_meta_cache """
+                sql """CREATE CATALOG test_file_meta_cache PROPERTIES (
+                    'type'='hms',
+                    'hive.metastore.uris' = 
'thrift://${externalEnvIp}:${hms_port}'
+                );"""
+
+                hive_docker """show databases;"""
+                hive_docker """drop table if exists 
default.test_file_meta_cache;  """
+                hive_docker """
+                                create table default.test_file_meta_cache 
(col1  int, col2 string) STORED AS ${fileFormat}; 
+                            """
+                hive_docker """insert into default.test_file_meta_cache values 
(1, "a"),(2, "b"); """
+
+                sql """ refresh  catalog test_file_meta_cache """ 
+                qt_1 """ select * from 
test_file_meta_cache.`default`.test_file_meta_cache order by col1 ; """
+
+                hive_docker """ TRUNCATE TABLE test_file_meta_cache """ 
+                hive_docker """insert into default.test_file_meta_cache values 
(3, "c"), (4, "d"); """
+                
+                sql """ refresh  catalog test_file_meta_cache """ 
+                qt_2 """ select * from 
test_file_meta_cache.`default`.test_file_meta_cache order by col1 ; """
+
+                
+
+                hive_docker """ drop TABLE test_file_meta_cache """ 
+                hive_docker """
+                                create table default.test_file_meta_cache 
(col1  int, col2 string) STORED AS PARQUET; 
+                            """
+                hive_docker """insert into default.test_file_meta_cache values 
(5, "e"), (6, "f"); """
+                
+                sql """ refresh  catalog test_file_meta_cache """ 
+                qt_3 """ select * from 
test_file_meta_cache.`default`.test_file_meta_cache order by col1 ; """
+
+                hive_docker """ INSERT OVERWRITE TABLE test_file_meta_cache 
values (7,'g'), (8, 'h'); """
+
+                sql """ refresh  catalog test_file_meta_cache """ 
+                qt_4 """ select * from 
test_file_meta_cache.`default`.test_file_meta_cache order by col1 ; """
+
+
+            } finally {
+            }
+        }
+    }
+
+}
+
diff --git 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_invaild_avro_name.groovy
 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_invaild_avro_name.groovy
new file mode 100644
index 00000000000..cf79d6d1fef
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_invaild_avro_name.groovy
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_invaild_avro_name", 
"p0,external,doris,external_docker,external_docker_doris") {
+
+    String enabled = context.config.otherConfigs.get("enableIcebergTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("disable iceberg test.")
+        return
+    }
+
+    String catalog_name = "test_iceberg_invaild_avro_name"
+    String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
+    String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+
+    sql """drop catalog if exists ${catalog_name}"""
+    sql """
+    CREATE CATALOG ${catalog_name} PROPERTIES (
+        'type'='iceberg',
+        'iceberg.catalog.type'='rest',
+        'uri' = 'http://${externalEnvIp}:${rest_port}',
+        "s3.access_key" = "admin",
+        "s3.secret_key" = "password",
+        "s3.endpoint" = "http://${externalEnvIp}:${minio_port}";,
+        "s3.region" = "us-east-1"
+    );"""
+
+    logger.info("catalog " + catalog_name + " created")
+    sql """switch ${catalog_name};"""
+    logger.info("switched to catalog " + catalog_name)
+    sql """ use test_db;""" 
+
+    def tables  = ["test_invalid_avro_name_parquet", 
"test_invalid_avro_name_orc"]
+
+
+    for (String table: tables) {
+        qt_desc """ desc ${table} """
+        qt_q_1 """ SELECT * FROM ${table} order by id;"""
+        qt_q_2 """ SELECT * FROM ${table} WHERE `TEST:A1B2.RAW.ABC-GG-1-A` = 
'row3' order by id;"""
+        qt_q_3 """ SELECT id FROM ${table} WHERE `TEST:A1B2.RAW.ABC-GG-1-A` 
LIKE 'row%' order by id;"""
+        qt_q_4 """ SELECT * FROM ${table} ORDER BY `TEST:A1B2.RAW.ABC-GG-1-A` 
DESC;"""
+
+    }
+}
+
+
+
+
+
+
+
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to