This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1-lakehouse
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1-lakehouse by this 
push:
     new cf6eadb4c28 [opt](parquet-reader)Implement late materialization of 
parquet complex types. (#44098)
cf6eadb4c28 is described below

commit cf6eadb4c28a1343b69915df69c4ac91d5d966c1
Author: Qi Chen <che...@selectdb.com>
AuthorDate: Thu Dec 26 11:23:18 2024 +0800

    [opt](parquet-reader)Implement late materialization of parquet complex 
types. (#44098)
    
    ### What problem does this PR solve?
    
    Problem Summary:
    Late materialization is not supported when querying fields with complex
    types.
    
    ### Release note
    [opt](parquet-reader)Implement late materialization of parquet complex 
types.
---
 be/src/vec/exec/format/parquet/parquet_common.cpp  | 152 +++----
 be/src/vec/exec/format/parquet/parquet_common.h    | 156 +++++--
 .../exec/format/parquet/vparquet_column_reader.cpp | 170 ++++++--
 .../exec/format/parquet/vparquet_column_reader.h   |  60 ++-
 .../exec/format/parquet/vparquet_group_reader.cpp  |  58 +--
 .../exec/format/parquet/vparquet_group_reader.h    |   6 +-
 be/src/vec/exec/format/parquet/vparquet_reader.cpp |   3 +-
 be/test/vec/exec/parquet/parquet_common_test.cpp   | 457 +++++++++++++++++++++
 .../parquet_nested_type_cross_page_test.cpp        | 179 ++++++++
 be/test/vec/exec/parquet/parquet_thrift_test.cpp   |   8 +-
 .../parquet_nested_types/create_table.hql          |  58 +++
 .../multi_catalog/parquet_nested_types/data.tar.gz | Bin 0 -> 36976 bytes
 .../data_gen_scripts/nested_cross_page_test1.py    | 192 +++++++++
 .../data_gen_scripts/nested_cross_page_test2.py    | 287 +++++++++++++
 .../data_gen_scripts/nested_cross_page_test3.py    | 196 +++++++++
 .../data/multi_catalog/parquet_nested_types/run.sh |  12 +
 .../hive/test_parquet_nested_types.out             | Bin 0 -> 181799 bytes
 .../hive/test_parquet_nested_types.groovy          | 209 ++++++++++
 18 files changed, 1980 insertions(+), 223 deletions(-)

diff --git a/be/src/vec/exec/format/parquet/parquet_common.cpp 
b/be/src/vec/exec/format/parquet/parquet_common.cpp
index 59e12fcc71a..f71f511edd3 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.cpp
+++ b/be/src/vec/exec/format/parquet/parquet_common.cpp
@@ -28,24 +28,19 @@ const int32_t ParquetInt96::JULIAN_EPOCH_OFFSET_DAYS = 
2440588;
 const int64_t ParquetInt96::MICROS_IN_DAY = 86400000000;
 const int64_t ParquetInt96::NANOS_PER_MICROSECOND = 1000;
 
-ColumnSelectVector::ColumnSelectVector(const uint8_t* filter_map, size_t 
filter_map_size,
-                                       bool filter_all) {
-    build(filter_map, filter_map_size, filter_all);
-}
-
-void ColumnSelectVector::build(const uint8_t* filter_map, size_t 
filter_map_size, bool filter_all) {
+Status FilterMap::init(const uint8_t* filter_map_data, size_t filter_map_size, 
bool filter_all) {
     _filter_all = filter_all;
-    _filter_map = filter_map;
+    _filter_map_data = filter_map_data;
     _filter_map_size = filter_map_size;
     if (filter_all) {
         _has_filter = true;
         _filter_ratio = 1;
-    } else if (filter_map == nullptr) {
+    } else if (filter_map_data == nullptr) {
         _has_filter = false;
         _filter_ratio = 0;
     } else {
-        size_t filter_count =
-                simd::count_zero_num(reinterpret_cast<const 
int8_t*>(filter_map), filter_map_size);
+        size_t filter_count = simd::count_zero_num(reinterpret_cast<const 
int8_t*>(filter_map_data),
+                                                   filter_map_size);
         if (filter_count == filter_map_size) {
             _has_filter = true;
             _filter_all = true;
@@ -58,109 +53,68 @@ void ColumnSelectVector::build(const uint8_t* filter_map, 
size_t filter_map_size
             _filter_ratio = 0;
         }
     }
+    return Status::OK();
 }
 
-void ColumnSelectVector::set_run_length_null_map(const std::vector<uint16_t>& 
run_length_null_map,
-                                                 size_t num_values, NullMap* 
null_map) {
-    _num_values = num_values;
-    _num_nulls = 0;
-    _read_index = 0;
-    size_t map_index = 0;
-    bool is_null = false;
-    if (_has_filter) {
-        // No run length null map is generated when _filter_all = true
-        DCHECK(!_filter_all);
-        _data_map.resize(num_values);
-        for (auto& run_length : run_length_null_map) {
-            if (is_null) {
-                _num_nulls += run_length;
-                for (int i = 0; i < run_length; ++i) {
-                    _data_map[map_index++] = FILTERED_NULL;
-                }
-            } else {
-                for (int i = 0; i < run_length; ++i) {
-                    _data_map[map_index++] = FILTERED_CONTENT;
-                }
-            }
-            is_null = !is_null;
-        }
-        size_t num_read = 0;
-        DCHECK_LE(_filter_map_index + num_values, _filter_map_size);
-        for (size_t i = 0; i < num_values; ++i) {
-            if (_filter_map[_filter_map_index++]) {
-                _data_map[i] = _data_map[i] == FILTERED_NULL ? NULL_DATA : 
CONTENT;
-                num_read++;
-            }
-        }
-        _num_filtered = num_values - num_read;
-        if (null_map != nullptr && num_read > 0) {
-            NullMap& map_data_column = *null_map;
-            auto null_map_index = map_data_column.size();
-            map_data_column.resize(null_map_index + num_read);
-            if (_num_nulls == 0) {
-                memset(map_data_column.data() + null_map_index, 0, num_read);
-            } else if (_num_nulls == num_values) {
-                memset(map_data_column.data() + null_map_index, 1, num_read);
-            } else {
-                for (size_t i = 0; i < num_values; ++i) {
-                    if (_data_map[i] == CONTENT) {
-                        map_data_column[null_map_index++] = (UInt8) false;
-                    } else if (_data_map[i] == NULL_DATA) {
-                        map_data_column[null_map_index++] = (UInt8) true;
-                    }
-                }
-            }
-        }
-    } else {
-        _num_filtered = 0;
-        _run_length_null_map = &run_length_null_map;
-        if (null_map != nullptr) {
-            NullMap& map_data_column = *null_map;
-            auto null_map_index = map_data_column.size();
-            map_data_column.resize(null_map_index + num_values);
-
-            for (auto& run_length : run_length_null_map) {
-                if (is_null) {
-                    memset(map_data_column.data() + null_map_index, 1, 
run_length);
-                    null_map_index += run_length;
-                    _num_nulls += run_length;
-                } else {
-                    memset(map_data_column.data() + null_map_index, 0, 
run_length);
-                    null_map_index += run_length;
-                }
-                is_null = !is_null;
-            }
-        } else {
-            for (auto& run_length : run_length_null_map) {
-                if (is_null) {
-                    _num_nulls += run_length;
-                }
-                is_null = !is_null;
-            }
-        }
-    }
-}
-
-bool ColumnSelectVector::can_filter_all(size_t remaining_num_values) {
+bool FilterMap::can_filter_all(size_t remaining_num_values, size_t 
filter_map_index) {
     if (!_has_filter) {
         return false;
     }
     if (_filter_all) {
         // all data in normal columns can be skipped when _filter_all = true,
         // so the remaining_num_values should be less than the remaining 
filter map size.
-        DCHECK_LE(remaining_num_values + _filter_map_index, _filter_map_size);
+        DCHECK_LE(remaining_num_values + filter_map_index, _filter_map_size);
         // return true always, to make sure that the data in normal columns 
can be skipped.
         return true;
     }
-    if (remaining_num_values + _filter_map_index > _filter_map_size) {
+    if (remaining_num_values + filter_map_index > _filter_map_size) {
         return false;
     }
-    return simd::count_zero_num(reinterpret_cast<const int8_t*>(_filter_map + 
_filter_map_index),
-                                remaining_num_values) == remaining_num_values;
-}
+    return simd::count_zero_num(
+                   reinterpret_cast<const int8_t*>(_filter_map_data + 
filter_map_index),
+                   remaining_num_values) == remaining_num_values;
+}
+
+Status FilterMap::generate_nested_filter_map(const std::vector<level_t>& 
rep_levels,
+                                             std::vector<uint8_t>& 
nested_filter_map_data,
+                                             std::unique_ptr<FilterMap>* 
nested_filter_map,
+                                             size_t* current_row_ptr, size_t 
start_index) const {
+    if (!has_filter() || filter_all()) {
+        return Status::InternalError(fmt::format(
+                "FilterMap::generate_nested_filter_map failed: has_filter={}, 
filter_all={}",
+                has_filter(), filter_all()));
+    }
+
+    if (rep_levels.empty()) {
+        return Status::OK();
+    }
+
+    nested_filter_map_data.resize(rep_levels.size());
 
-void ColumnSelectVector::skip(size_t num_values) {
-    _filter_map_index += num_values;
+    size_t current_row = current_row_ptr ? *current_row_ptr : 0;
+
+    for (size_t i = start_index; i < rep_levels.size(); i++) {
+        if (i != start_index && rep_levels[i] == 0) {
+            current_row++;
+            if (current_row >= _filter_map_size) {
+                return Status::InvalidArgument(fmt::format(
+                        "current_row >= _filter_map_size. current_row: {}, 
_filter_map_size: {}",
+                        current_row, _filter_map_size));
+            }
+        }
+        nested_filter_map_data[i] = _filter_map_data[current_row];
+    }
+
+    if (current_row_ptr) {
+        *current_row_ptr = current_row;
+    }
+
+    auto new_filter = std::make_unique<FilterMap>();
+    RETURN_IF_ERROR(
+            new_filter->init(nested_filter_map_data.data(), 
nested_filter_map_data.size(), false));
+    *nested_filter_map = std::move(new_filter);
+
+    return Status::OK();
 }
 
 ParsedVersion::ParsedVersion(std::string application, 
std::optional<std::string> version,
diff --git a/be/src/vec/exec/format/parquet/parquet_common.h 
b/be/src/vec/exec/format/parquet/parquet_common.h
index da374d5fe79..e4c394c05d2 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.h
+++ b/be/src/vec/exec/format/parquet/parquet_common.h
@@ -24,6 +24,7 @@
 #include <ostream>
 #include <regex>
 #include <string>
+#include <unordered_set>
 #include <vector>
 
 #include "vec/columns/column_nullable.h"
@@ -69,42 +70,148 @@ struct ParquetInt96 {
 #pragma pack()
 static_assert(sizeof(ParquetInt96) == 12, "The size of ParquetInt96 is not 
12.");
 
-class ColumnSelectVector {
+class FilterMap {
 public:
-    enum DataReadType : uint8_t { CONTENT = 0, NULL_DATA, FILTERED_CONTENT, 
FILTERED_NULL };
+    FilterMap() = default;
+    Status init(const uint8_t* filter_map_data, size_t filter_map_size, bool 
filter_all);
 
-    ColumnSelectVector(const uint8_t* filter_map, size_t filter_map_size, bool 
filter_all);
+    Status generate_nested_filter_map(const std::vector<level_t>& rep_levels,
+                                      std::vector<uint8_t>& 
nested_filter_map_data,
+                                      std::unique_ptr<FilterMap>* 
nested_filter_map,
+                                      size_t* current_row_ptr, size_t 
start_index = 0) const;
 
-    ColumnSelectVector() = default;
+    const uint8_t* filter_map_data() const { return _filter_map_data; }
+    size_t filter_map_size() const { return _filter_map_size; }
+    bool has_filter() const { return _has_filter; }
+    bool filter_all() const { return _filter_all; }
+    double filter_ratio() const { return _has_filter ? _filter_ratio : 0; }
 
-    void build(const uint8_t* filter_map, size_t filter_map_size, bool 
filter_all);
+    bool can_filter_all(size_t remaining_num_values, size_t filter_map_index);
 
-    const uint8_t* filter_map() { return _filter_map; }
+private:
+    bool _has_filter = false;
+    bool _filter_all = false;
+    const uint8_t* _filter_map_data = nullptr;
+    size_t _filter_map_size = 0;
+    double _filter_ratio = 0;
+};
 
-    size_t num_values() const { return _num_values; }
+class ColumnSelectVector {
+public:
+    enum DataReadType : uint8_t { CONTENT = 0, NULL_DATA, FILTERED_CONTENT, 
FILTERED_NULL };
 
-    size_t num_nulls() const { return _num_nulls; }
+    ColumnSelectVector() = default;
 
-    size_t num_filtered() const { return _num_filtered; }
+    Status init(const std::vector<uint16_t>& run_length_null_map, size_t 
num_values,
+                NullMap* null_map, FilterMap* filter_map, size_t 
filter_map_index,
+                const std::unordered_set<size_t>* skipped_indices = nullptr) {
+        _num_values = num_values;
+        _num_nulls = 0;
+        _read_index = 0;
+        size_t map_index = 0;
+        bool is_null = false;
+        _has_filter = filter_map->has_filter();
+
+        if (filter_map->has_filter()) {
+            // No run length null map is generated when _filter_all = true
+            DCHECK(!filter_map->filter_all());
+            _data_map.resize(num_values);
+            for (auto& run_length : run_length_null_map) {
+                if (is_null) {
+                    _num_nulls += run_length;
+                    for (int i = 0; i < run_length; ++i) {
+                        _data_map[map_index++] = FILTERED_NULL;
+                    }
+                } else {
+                    for (int i = 0; i < run_length; ++i) {
+                        _data_map[map_index++] = FILTERED_CONTENT;
+                    }
+                }
+                is_null = !is_null;
+            }
 
-    double filter_ratio() const { return _has_filter ? _filter_ratio : 0; }
+            size_t num_read = 0;
+            size_t i = 0;
+            size_t valid_count = 0;
 
-    void fallback_filter() { _has_filter = false; }
+            while (valid_count < num_values) {
+                DCHECK_LT(filter_map_index + i, filter_map->filter_map_size());
 
-    bool has_filter() const { return _has_filter; }
+                if (skipped_indices != nullptr &&
+                    skipped_indices->count(filter_map_index + i) > 0) {
+                    ++i;
+                    continue;
+                }
 
-    bool can_filter_all(size_t remaining_num_values);
+                if (filter_map->filter_map_data()[filter_map_index + i]) {
+                    _data_map[valid_count] =
+                            _data_map[valid_count] == FILTERED_NULL ? 
NULL_DATA : CONTENT;
+                    num_read++;
+                }
+                ++valid_count;
+                ++i;
+            }
 
-    bool filter_all() const { return _filter_all; }
+            _num_filtered = num_values - num_read;
 
-    void skip(size_t num_values);
+            if (null_map != nullptr && num_read > 0) {
+                NullMap& map_data_column = *null_map;
+                auto null_map_index = map_data_column.size();
+                map_data_column.resize(null_map_index + num_read);
 
-    void reset() {
-        if (_has_filter) {
-            _filter_map_index = 0;
+                if (_num_nulls == 0) {
+                    memset(map_data_column.data() + null_map_index, 0, 
num_read);
+                } else if (_num_nulls == num_values) {
+                    memset(map_data_column.data() + null_map_index, 1, 
num_read);
+                } else {
+                    for (size_t i = 0; i < num_values; ++i) {
+                        if (_data_map[i] == CONTENT) {
+                            map_data_column[null_map_index++] = (UInt8) false;
+                        } else if (_data_map[i] == NULL_DATA) {
+                            map_data_column[null_map_index++] = (UInt8) true;
+                        }
+                    }
+                }
+            }
+        } else {
+            _num_filtered = 0;
+            _run_length_null_map = &run_length_null_map;
+            if (null_map != nullptr) {
+                NullMap& map_data_column = *null_map;
+                auto null_map_index = map_data_column.size();
+                map_data_column.resize(null_map_index + num_values);
+
+                for (auto& run_length : run_length_null_map) {
+                    if (is_null) {
+                        memset(map_data_column.data() + null_map_index, 1, 
run_length);
+                        null_map_index += run_length;
+                        _num_nulls += run_length;
+                    } else {
+                        memset(map_data_column.data() + null_map_index, 0, 
run_length);
+                        null_map_index += run_length;
+                    }
+                    is_null = !is_null;
+                }
+            } else {
+                for (auto& run_length : run_length_null_map) {
+                    if (is_null) {
+                        _num_nulls += run_length;
+                    }
+                    is_null = !is_null;
+                }
+            }
         }
+        return Status::OK();
     }
 
+    size_t num_values() const { return _num_values; }
+
+    size_t num_nulls() const { return _num_nulls; }
+
+    size_t num_filtered() const { return _num_filtered; }
+
+    bool has_filter() const { return _has_filter; }
+
     template <bool has_filter>
     size_t get_next_run(DataReadType* data_read_type) {
         DCHECK_EQ(_has_filter, has_filter);
@@ -137,22 +244,11 @@ public:
         }
     }
 
-    void set_run_length_null_map(const std::vector<uint16_t>& 
run_length_null_map,
-                                 size_t num_values, NullMap* null_map = 
nullptr);
-
 private:
     std::vector<DataReadType> _data_map;
     // the length of non-null values and null values are arranged in turn.
     const std::vector<uint16_t>* _run_length_null_map;
-    bool _has_filter = false;
-    // only used when the whole batch is skipped
-    bool _filter_all = false;
-    const uint8_t* _filter_map = nullptr;
-    size_t _filter_map_size = 0;
-    double _filter_ratio = 0;
-    size_t _filter_map_index = 0;
-
-    // generated in set_run_length_null_map
+    bool _has_filter;
     size_t _num_values;
     size_t _num_nulls;
     size_t _num_filtered;
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index fd3200b3640..207b917666b 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -238,7 +238,7 @@ Status ScalarColumnReader::_skip_values(size_t num_values) {
 }
 
 Status ScalarColumnReader::_read_values(size_t num_values, ColumnPtr& 
doris_column,
-                                        DataTypePtr& type, ColumnSelectVector& 
select_vector,
+                                        DataTypePtr& type, FilterMap& 
filter_map,
                                         bool is_dict_filter) {
     if (num_values == 0) {
         return Status::OK();
@@ -301,9 +301,12 @@ Status ScalarColumnReader::_read_values(size_t num_values, 
ColumnPtr& doris_colu
         }
         null_map.emplace_back((u_short)remaining);
     }
+    ColumnSelectVector select_vector;
     {
         SCOPED_RAW_TIMER(&_decode_null_map_time);
-        select_vector.set_run_length_null_map(null_map, num_values, 
map_data_column);
+        RETURN_IF_ERROR(select_vector.init(null_map, num_values, 
map_data_column, &filter_map,
+                                           _filter_map_index));
+        _filter_map_index += num_values;
     }
     return _chunk_reader->decode_values(data_column, type, select_vector, 
is_dict_filter);
 }
@@ -314,9 +317,12 @@ Status ScalarColumnReader::_read_values(size_t num_values, 
ColumnPtr& doris_colu
  * whether the reader should read the remaining value of the last row in 
previous page.
  */
 Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, 
DataTypePtr& type,
-                                               ColumnSelectVector& 
select_vector, size_t batch_size,
+                                               FilterMap& filter_map, size_t 
batch_size,
                                                size_t* read_rows, bool* eof, 
bool is_dict_filter,
-                                               bool align_rows = false) {
+                                               bool align_rows) {
+    std::unique_ptr<FilterMap> nested_filter_map;
+
+    FilterMap* current_filter_map = &filter_map;
     size_t origin_size = 0;
     if (align_rows) {
         origin_size = _rep_levels.size();
@@ -326,17 +332,22 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& 
doris_column, DataType
     } else {
         _rep_levels.resize(0);
         _def_levels.resize(0);
+        if (_nested_filter_map_data) {
+            _nested_filter_map_data->resize(0);
+        }
     }
     size_t parsed_rows = 0;
     size_t remaining_values = _chunk_reader->remaining_num_values();
     bool has_rep_level = _chunk_reader->max_rep_level() > 0;
     bool has_def_level = _chunk_reader->max_def_level() > 0;
 
+    // Handle repetition levels (indicates nesting structure)
     if (has_rep_level) {
         LevelDecoder& rep_decoder = _chunk_reader->rep_level_decoder();
+        // Read repetition levels until batch is full or no more values
         while (parsed_rows <= batch_size && remaining_values > 0) {
             level_t rep_level = rep_decoder.get_next();
-            if (rep_level == 0) {
+            if (rep_level == 0) { // rep_level 0 indicates start of new row
                 if (parsed_rows == batch_size) {
                     rep_decoder.rewind_one();
                     break;
@@ -346,12 +357,26 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& 
doris_column, DataType
             _rep_levels.emplace_back(rep_level);
             remaining_values--;
         }
+
+        // Generate nested filter map
+        if (filter_map.has_filter() && (!filter_map.filter_all())) {
+            if (_nested_filter_map_data == nullptr) {
+                _nested_filter_map_data.reset(new std::vector<uint8_t>());
+            }
+            RETURN_IF_ERROR(filter_map.generate_nested_filter_map(
+                    _rep_levels, *_nested_filter_map_data, &nested_filter_map,
+                    &_orig_filter_map_index, origin_size));
+            // Update current_filter_map to nested_filter_map
+            current_filter_map = nested_filter_map.get();
+        }
     } else if (!align_rows) {
         // case : required child columns in struct type
         parsed_rows = std::min(remaining_values, batch_size);
         remaining_values -= parsed_rows;
         _rep_levels.resize(parsed_rows, 0);
     }
+
+    // Process definition levels (indicates null values)
     size_t parsed_values = _chunk_reader->remaining_num_values() - 
remaining_values;
     _def_levels.resize(origin_size + parsed_values);
     if (has_def_level) {
@@ -360,6 +385,7 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& 
doris_column, DataType
         std::fill(_def_levels.begin() + origin_size, _def_levels.end(), 0);
     }
 
+    // Handle nullable columns
     MutableColumnPtr data_column;
     std::vector<uint16_t> null_map;
     NullMap* map_data_column = nullptr;
@@ -375,10 +401,16 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& 
doris_column, DataType
         }
         data_column = doris_column->assume_mutable();
     }
+
+    // Process definition levels to build null map
     size_t has_read = origin_size;
     size_t ancestor_nulls = 0;
+    size_t null_size = 0;
+    size_t nonnull_size = 0;
     null_map.emplace_back(0);
     bool prev_is_null = false;
+    std::unordered_set<size_t> ancestor_null_indices;
+
     while (has_read < origin_size + parsed_values) {
         level_t def_level = _def_levels[has_read++];
         size_t loop_read = 1;
@@ -386,16 +418,23 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& 
doris_column, DataType
             has_read++;
             loop_read++;
         }
+
         if (def_level < _field_schema->repeated_parent_def_level) {
-            // when def_level is less than repeated_parent_def_level, it means 
that level
-            // will affect its ancestor.
+            for (size_t i = 0; i < loop_read; i++) {
+                ancestor_null_indices.insert(has_read - loop_read + i);
+            }
             ancestor_nulls += loop_read;
             continue;
         }
+
         bool is_null = def_level < _field_schema->definition_level;
+        if (is_null) {
+            null_size += loop_read;
+        } else {
+            nonnull_size += loop_read;
+        }
+
         if (prev_is_null == is_null && (USHRT_MAX - null_map.back() >= 
loop_read)) {
-            // If whether the values are nullable in current loop is the same 
the previous values,
-            // we can save the memory usage in null map
             null_map.back() += loop_read;
         } else {
             if (!(prev_is_null ^ is_null)) {
@@ -413,29 +452,78 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& 
doris_column, DataType
     }
 
     size_t num_values = parsed_values - ancestor_nulls;
-    {
-        SCOPED_RAW_TIMER(&_decode_null_map_time);
-        select_vector.set_run_length_null_map(null_map, num_values, 
map_data_column);
-    }
-    RETURN_IF_ERROR(_chunk_reader->decode_values(data_column, type, 
select_vector, is_dict_filter));
-    if (ancestor_nulls != 0) {
-        RETURN_IF_ERROR(_chunk_reader->skip_values(ancestor_nulls, false));
-    }
 
-    if (!align_rows) {
-        *read_rows = parsed_rows;
+    // Handle filtered values
+    if (current_filter_map->filter_all()) {
+        // Skip all values if everything is filtered
+        if (null_size > 0) {
+            RETURN_IF_ERROR(_chunk_reader->skip_values(null_size, false));
+        }
+        if (nonnull_size > 0) {
+            RETURN_IF_ERROR(_chunk_reader->skip_values(nonnull_size, true));
+        }
+        if (ancestor_nulls != 0) {
+            RETURN_IF_ERROR(_chunk_reader->skip_values(ancestor_nulls, false));
+        }
+    } else {
+        ColumnSelectVector select_vector;
+        {
+            SCOPED_RAW_TIMER(&_decode_null_map_time);
+            RETURN_IF_ERROR(
+                    select_vector.init(null_map, num_values, map_data_column, 
current_filter_map,
+                                       _nested_filter_map_data ? origin_size : 
_filter_map_index,
+                                       &ancestor_null_indices));
+        }
+
+        RETURN_IF_ERROR(
+                _chunk_reader->decode_values(data_column, type, select_vector, 
is_dict_filter));
+        if (ancestor_nulls != 0) {
+            RETURN_IF_ERROR(_chunk_reader->skip_values(ancestor_nulls, false));
+        }
     }
+    *read_rows += parsed_rows;
+    _filter_map_index += parsed_values;
+
+    // Handle cross-page reading
     if (_chunk_reader->remaining_num_values() == 0) {
         if (_chunk_reader->has_next_page()) {
             RETURN_IF_ERROR(_chunk_reader->next_page());
             RETURN_IF_ERROR(_chunk_reader->load_page_data());
-            select_vector.reset();
-            return _read_nested_column(doris_column, type, select_vector, 0, 
read_rows, eof,
+            return _read_nested_column(doris_column, type, filter_map, 0, 
read_rows, eof,
                                        is_dict_filter, true);
         } else {
             *eof = true;
         }
     }
+
+    // Apply filtering to repetition and definition levels
+    if (current_filter_map->has_filter()) {
+        if (current_filter_map->filter_all()) {
+            _rep_levels.resize(0);
+            _def_levels.resize(0);
+        } else {
+            std::vector<level_t> filtered_rep_levels;
+            std::vector<level_t> filtered_def_levels;
+            filtered_rep_levels.reserve(_rep_levels.size());
+            filtered_def_levels.reserve(_def_levels.size());
+
+            const uint8_t* filter_map_data = 
current_filter_map->filter_map_data();
+
+            for (size_t i = 0; i < _rep_levels.size(); i++) {
+                if (filter_map_data[i]) {
+                    filtered_rep_levels.push_back(_rep_levels[i]);
+                    filtered_def_levels.push_back(_def_levels[i]);
+                }
+            }
+
+            _rep_levels = std::move(filtered_rep_levels);
+            _def_levels = std::move(filtered_def_levels);
+        }
+    }
+
+    // Prepare for next row
+    ++_orig_filter_map_index;
+
     if (_rep_levels.size() > 0) {
         // make sure the rows of complex type are aligned correctly,
         // so the repetition level of first element should be 0, meaning a new 
row is started.
@@ -443,6 +531,7 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& 
doris_column, DataType
     }
     return Status::OK();
 }
+
 Status ScalarColumnReader::read_dict_values_to_column(MutableColumnPtr& 
doris_column,
                                                       bool* has_dict) {
     bool loaded;
@@ -474,7 +563,7 @@ Status ScalarColumnReader::_try_load_dict_page(bool* 
loaded, bool* has_dict) {
 }
 
 Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, 
DataTypePtr& type,
-                                            ColumnSelectVector& select_vector, 
size_t batch_size,
+                                            FilterMap& filter_map, size_t 
batch_size,
                                             size_t* read_rows, bool* eof, bool 
is_dict_filter) {
     if (_converter == nullptr) {
         _converter = parquet::PhysicalToLogicalConverter::get_converter(
@@ -499,8 +588,8 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& 
doris_column, DataTypePtr
         }
         if (_nested_column) {
             RETURN_IF_ERROR(_chunk_reader->load_page_data_idempotent());
-            RETURN_IF_ERROR(_read_nested_column(resolved_column, 
resolved_type, select_vector,
-                                                batch_size, read_rows, eof, 
is_dict_filter));
+            RETURN_IF_ERROR(_read_nested_column(resolved_column, 
resolved_type, filter_map,
+                                                batch_size, read_rows, eof, 
is_dict_filter, false));
             break;
         }
 
@@ -518,16 +607,16 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& 
doris_column, DataTypePtr
             bool skip_whole_batch = false;
             // Determining whether to skip page or batch will increase the 
calculation time.
             // When the filtering effect is greater than 60%, it is possible 
to skip the page or batch.
-            if (select_vector.has_filter() && select_vector.filter_ratio() > 
0.6) {
+            if (filter_map.has_filter() && filter_map.filter_ratio() > 0.6) {
                 // lazy read
                 size_t remaining_num_values = 0;
                 for (auto& range : read_ranges) {
                     remaining_num_values += range.last_row - range.first_row;
                 }
                 if (batch_size >= remaining_num_values &&
-                    select_vector.can_filter_all(remaining_num_values)) {
+                    filter_map.can_filter_all(remaining_num_values, 
_filter_map_index)) {
                     // We can skip the whole page if the remaining values is 
filtered by predicate columns
-                    select_vector.skip(remaining_num_values);
+                    _filter_map_index += remaining_num_values;
                     _current_row_index += 
_chunk_reader->remaining_num_values();
                     RETURN_IF_ERROR(_chunk_reader->skip_page());
                     *read_rows = remaining_num_values;
@@ -537,9 +626,9 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& 
doris_column, DataTypePtr
                     break;
                 }
                 skip_whole_batch = batch_size <= remaining_num_values &&
-                                   select_vector.can_filter_all(batch_size);
+                                   filter_map.can_filter_all(batch_size, 
_filter_map_index);
                 if (skip_whole_batch) {
-                    select_vector.skip(batch_size);
+                    _filter_map_index += batch_size;
                 }
             }
             // load page data to decode or skip values
@@ -557,7 +646,7 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& 
doris_column, DataTypePtr
                     RETURN_IF_ERROR(_skip_values(read_values));
                 } else {
                     RETURN_IF_ERROR(_read_values(read_values, resolved_column, 
resolved_type,
-                                                 select_vector, 
is_dict_filter));
+                                                 filter_map, is_dict_filter));
                 }
                 has_read += read_values;
                 _current_row_index += read_values;
@@ -585,7 +674,7 @@ Status 
ArrayColumnReader::init(std::unique_ptr<ParquetColumnReader> element_read
 }
 
 Status ArrayColumnReader::read_column_data(ColumnPtr& doris_column, 
DataTypePtr& type,
-                                           ColumnSelectVector& select_vector, 
size_t batch_size,
+                                           FilterMap& filter_map, size_t 
batch_size,
                                            size_t* read_rows, bool* eof, bool 
is_dict_filter) {
     MutableColumnPtr data_column;
     NullMap* null_map_ptr = nullptr;
@@ -611,7 +700,7 @@ Status ArrayColumnReader::read_column_data(ColumnPtr& 
doris_column, DataTypePtr&
             (reinterpret_cast<const 
DataTypeArray*>(remove_nullable(type).get()))
                     ->get_nested_type());
     // read nested column
-    RETURN_IF_ERROR(_element_reader->read_column_data(element_column, 
element_type, select_vector,
+    RETURN_IF_ERROR(_element_reader->read_column_data(element_column, 
element_type, filter_map,
                                                       batch_size, read_rows, 
eof, is_dict_filter));
     if (*read_rows == 0) {
         return Status::OK();
@@ -636,7 +725,7 @@ Status 
MapColumnReader::init(std::unique_ptr<ParquetColumnReader> key_reader,
 }
 
 Status MapColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& 
type,
-                                         ColumnSelectVector& select_vector, 
size_t batch_size,
+                                         FilterMap& filter_map, size_t 
batch_size,
                                          size_t* read_rows, bool* eof, bool 
is_dict_filter) {
     MutableColumnPtr data_column;
     NullMap* null_map_ptr = nullptr;
@@ -669,12 +758,12 @@ Status MapColumnReader::read_column_data(ColumnPtr& 
doris_column, DataTypePtr& t
     size_t value_rows = 0;
     bool key_eof = false;
     bool value_eof = false;
-    RETURN_IF_ERROR(_key_reader->read_column_data(key_column, key_type, 
select_vector, batch_size,
+    RETURN_IF_ERROR(_key_reader->read_column_data(key_column, key_type, 
filter_map, batch_size,
                                                   &key_rows, &key_eof, 
is_dict_filter));
+
     while (value_rows < key_rows && !value_eof) {
         size_t loop_rows = 0;
-        select_vector.reset();
-        RETURN_IF_ERROR(_value_reader->read_column_data(value_column, 
value_type, select_vector,
+        RETURN_IF_ERROR(_value_reader->read_column_data(value_column, 
value_type, filter_map,
                                                         key_rows - value_rows, 
&loop_rows,
                                                         &value_eof, 
is_dict_filter));
         value_rows += loop_rows;
@@ -705,7 +794,7 @@ Status StructColumnReader::init(
     return Status::OK();
 }
 Status StructColumnReader::read_column_data(ColumnPtr& doris_column, 
DataTypePtr& type,
-                                            ColumnSelectVector& select_vector, 
size_t batch_size,
+                                            FilterMap& filter_map, size_t 
batch_size,
                                             size_t* read_rows, bool* eof, bool 
is_dict_filter) {
     MutableColumnPtr data_column;
     NullMap* null_map_ptr = nullptr;
@@ -748,22 +837,21 @@ Status StructColumnReader::read_column_data(ColumnPtr& 
doris_column, DataTypePtr
 
         _read_column_names.insert(doris_name);
 
-        select_vector.reset();
+        //        select_vector.reset();
         size_t field_rows = 0;
         bool field_eof = false;
         if (not_missing_column_id == -1) {
             not_missing_column_id = i;
             RETURN_IF_ERROR(_child_readers[doris_name]->read_column_data(
-                    doris_field, doris_type, select_vector, batch_size, 
&field_rows, &field_eof,
+                    doris_field, doris_type, filter_map, batch_size, 
&field_rows, &field_eof,
                     is_dict_filter));
             *read_rows = field_rows;
             *eof = field_eof;
         } else {
             while (field_rows < *read_rows && !field_eof) {
                 size_t loop_rows = 0;
-                select_vector.reset();
                 RETURN_IF_ERROR(_child_readers[doris_name]->read_column_data(
-                        doris_field, doris_type, select_vector, *read_rows - 
field_rows, &loop_rows,
+                        doris_field, doris_type, filter_map, *read_rows - 
field_rows, &loop_rows,
                         &field_eof, is_dict_filter));
                 field_rows += loop_rows;
             }
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
index 4c6e5b1eac9..51cdc3dbbd7 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
@@ -121,8 +121,8 @@ public:
             : _row_ranges(row_ranges), _ctz(ctz), _io_ctx(io_ctx) {}
     virtual ~ParquetColumnReader() = default;
     virtual Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
-                                    ColumnSelectVector& select_vector, size_t 
batch_size,
-                                    size_t* read_rows, bool* eof, bool 
is_dict_filter) = 0;
+                                    FilterMap& filter_map, size_t batch_size, 
size_t* read_rows,
+                                    bool* eof, bool is_dict_filter) = 0;
 
     virtual Status read_dict_values_to_column(MutableColumnPtr& doris_column, 
bool* has_dict) {
         return Status::NotSupported("read_dict_values_to_column is not 
supported");
@@ -144,6 +144,8 @@ public:
     virtual Statistics statistics() = 0;
     virtual void close() = 0;
 
+    virtual void reset_filter_map_index() = 0;
+
 protected:
     void _generate_read_ranges(int64_t start_index, int64_t end_index,
                                std::list<RowRange>& read_ranges);
@@ -157,6 +159,8 @@ protected:
     int64_t _current_row_index = 0;
     int _row_range_index = 0;
     int64_t _decode_null_map_time = 0;
+
+    size_t _filter_map_index = 0;
 };
 
 class ScalarColumnReader : public ParquetColumnReader {
@@ -171,9 +175,9 @@ public:
               _offset_index(offset_index) {}
     ~ScalarColumnReader() override { close(); }
     Status init(io::FileReaderSPtr file, FieldSchema* field, size_t 
max_buf_size);
-    Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
-                            ColumnSelectVector& select_vector, size_t 
batch_size, size_t* read_rows,
-                            bool* eof, bool is_dict_filter) override;
+    Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, 
FilterMap& filter_map,
+                            size_t batch_size, size_t* read_rows, bool* eof,
+                            bool is_dict_filter) override;
     Status read_dict_values_to_column(MutableColumnPtr& doris_column, bool* 
has_dict) override;
     MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32* 
dict_column) override;
     const std::vector<level_t>& get_rep_level() const override { return 
_rep_levels; }
@@ -184,6 +188,11 @@ public:
     }
     void close() override {}
 
+    void reset_filter_map_index() override {
+        _filter_map_index = 0; // nested
+        _orig_filter_map_index = 0;
+    }
+
 private:
     tparquet::ColumnChunk _chunk_meta;
     const tparquet::OffsetIndex* _offset_index;
@@ -192,13 +201,15 @@ private:
     std::vector<level_t> _rep_levels;
     std::vector<level_t> _def_levels;
     std::unique_ptr<parquet::PhysicalToLogicalConverter> _converter = nullptr;
+    std::unique_ptr<std::vector<uint8_t>> _nested_filter_map_data = nullptr;
+    size_t _orig_filter_map_index = 0;
 
     Status _skip_values(size_t num_values);
     Status _read_values(size_t num_values, ColumnPtr& doris_column, 
DataTypePtr& type,
-                        ColumnSelectVector& select_vector, bool 
is_dict_filter);
-    Status _read_nested_column(ColumnPtr& doris_column, DataTypePtr& type,
-                               ColumnSelectVector& select_vector, size_t 
batch_size,
-                               size_t* read_rows, bool* eof, bool 
is_dict_filter, bool align_rows);
+                        FilterMap& filter_map, bool is_dict_filter);
+    Status _read_nested_column(ColumnPtr& doris_column, DataTypePtr& type, 
FilterMap& filter_map,
+                               size_t batch_size, size_t* read_rows, bool* 
eof, bool is_dict_filter,
+                               bool align_rows);
     Status _try_load_dict_page(bool* loaded, bool* has_dict);
 };
 
@@ -210,9 +221,9 @@ public:
             : ParquetColumnReader(row_ranges, ctz, io_ctx) {}
     ~ArrayColumnReader() override { close(); }
     Status init(std::unique_ptr<ParquetColumnReader> element_reader, 
FieldSchema* field);
-    Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
-                            ColumnSelectVector& select_vector, size_t 
batch_size, size_t* read_rows,
-                            bool* eof, bool is_dict_filter) override;
+    Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, 
FilterMap& filter_map,
+                            size_t batch_size, size_t* read_rows, bool* eof,
+                            bool is_dict_filter) override;
     const std::vector<level_t>& get_rep_level() const override {
         return _element_reader->get_rep_level();
     }
@@ -222,6 +233,8 @@ public:
     Statistics statistics() override { return _element_reader->statistics(); }
     void close() override {}
 
+    void reset_filter_map_index() override { 
_element_reader->reset_filter_map_index(); }
+
 private:
     std::unique_ptr<ParquetColumnReader> _element_reader;
 };
@@ -236,9 +249,9 @@ public:
 
     Status init(std::unique_ptr<ParquetColumnReader> key_reader,
                 std::unique_ptr<ParquetColumnReader> value_reader, 
FieldSchema* field);
-    Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
-                            ColumnSelectVector& select_vector, size_t 
batch_size, size_t* read_rows,
-                            bool* eof, bool is_dict_filter) override;
+    Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, 
FilterMap& filter_map,
+                            size_t batch_size, size_t* read_rows, bool* eof,
+                            bool is_dict_filter) override;
 
     const std::vector<level_t>& get_rep_level() const override {
         return _key_reader->get_rep_level();
@@ -256,6 +269,11 @@ public:
 
     void close() override {}
 
+    void reset_filter_map_index() override {
+        _key_reader->reset_filter_map_index();
+        _value_reader->reset_filter_map_index();
+    }
+
 private:
     std::unique_ptr<ParquetColumnReader> _key_reader;
     std::unique_ptr<ParquetColumnReader> _value_reader;
@@ -272,9 +290,9 @@ public:
     Status init(
             std::unordered_map<std::string, 
std::unique_ptr<ParquetColumnReader>>&& child_readers,
             FieldSchema* field);
-    Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
-                            ColumnSelectVector& select_vector, size_t 
batch_size, size_t* read_rows,
-                            bool* eof, bool is_dict_filter) override;
+    Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, 
FilterMap& filter_map,
+                            size_t batch_size, size_t* read_rows, bool* eof,
+                            bool is_dict_filter) override;
 
     const std::vector<level_t>& get_rep_level() const override {
         if (!_read_column_names.empty()) {
@@ -306,6 +324,12 @@ public:
 
     void close() override {}
 
+    void reset_filter_map_index() override {
+        for (const auto& reader : _child_readers) {
+            reader.second->reset_filter_map_index();
+        }
+    }
+
 private:
     std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> 
_child_readers;
     std::set<std::string> _read_column_names;
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 14dd64097e9..c7ab73e1266 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -317,9 +317,9 @@ Status RowGroupReader::next_batch(Block* block, size_t 
batch_size, size_t* read_
         // call _do_lazy_read recursively when current batch is skipped
         return _do_lazy_read(block, batch_size, read_rows, batch_eof);
     } else {
-        ColumnSelectVector run_length_vector;
+        FilterMap filter_map;
         RETURN_IF_ERROR(_read_column_data(block, 
_lazy_read_ctx.all_read_columns, batch_size,
-                                          read_rows, batch_eof, 
run_length_vector));
+                                          read_rows, batch_eof, filter_map));
         RETURN_IF_ERROR(
                 _fill_partition_columns(block, *read_rows, 
_lazy_read_ctx.partition_columns));
         RETURN_IF_ERROR(_fill_missing_columns(block, *read_rows, 
_lazy_read_ctx.missing_columns));
@@ -389,7 +389,7 @@ void 
RowGroupReader::_merge_read_ranges(std::vector<RowRange>& row_ranges) {
 
 Status RowGroupReader::_read_column_data(Block* block, const 
std::vector<std::string>& columns,
                                          size_t batch_size, size_t* read_rows, 
bool* batch_eof,
-                                         ColumnSelectVector& select_vector) {
+                                         FilterMap& filter_map) {
     size_t batch_read_rows = 0;
     bool has_eof = false;
     for (auto& read_col_name : columns) {
@@ -420,11 +420,12 @@ Status RowGroupReader::_read_column_data(Block* block, 
const std::vector<std::st
         size_t col_read_rows = 0;
         bool col_eof = false;
         // Should reset _filter_map_index to 0 when reading next column.
-        select_vector.reset();
+        //        select_vector.reset();
+        _column_readers[read_col_name]->reset_filter_map_index();
         while (!col_eof && col_read_rows < batch_size) {
             size_t loop_rows = 0;
             RETURN_IF_ERROR(_column_readers[read_col_name]->read_column_data(
-                    column_ptr, column_type, select_vector, batch_size - 
col_read_rows, &loop_rows,
+                    column_ptr, column_type, filter_map, batch_size - 
col_read_rows, &loop_rows,
                     &col_eof, is_dict_filter));
             col_read_rows += loop_rows;
         }
@@ -445,7 +446,7 @@ Status RowGroupReader::_read_column_data(Block* block, 
const std::vector<std::st
 
 Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* 
read_rows,
                                      bool* batch_eof) {
-    std::unique_ptr<ColumnSelectVector> select_vector_ptr = nullptr;
+    std::unique_ptr<FilterMap> filter_map_ptr = nullptr;
     size_t pre_read_rows;
     bool pre_eof;
     std::vector<uint32_t> columns_to_filter;
@@ -460,9 +461,9 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t 
batch_size, size_t* re
         // read predicate columns
         pre_read_rows = 0;
         pre_eof = false;
-        ColumnSelectVector run_length_vector;
+        FilterMap filter_map;
         RETURN_IF_ERROR(_read_column_data(block, 
_lazy_read_ctx.predicate_columns.first, batch_size,
-                                          &pre_read_rows, &pre_eof, 
run_length_vector));
+                                          &pre_read_rows, &pre_eof, 
filter_map));
         if (pre_read_rows == 0) {
             DCHECK_EQ(pre_eof, true);
             break;
@@ -504,9 +505,10 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t 
batch_size, size_t* re
             block->get_by_position(0).column->assume_mutable()->clear();
         }
 
-        const uint8_t* __restrict filter_map = result_filter.data();
-        select_vector_ptr.reset(new ColumnSelectVector(filter_map, 
pre_read_rows, can_filter_all));
-        if (select_vector_ptr->filter_all()) {
+        const uint8_t* __restrict filter_map_data = result_filter.data();
+        filter_map_ptr.reset(new FilterMap());
+        RETURN_IF_ERROR(filter_map_ptr->init(filter_map_data, pre_read_rows, 
can_filter_all));
+        if (filter_map_ptr->filter_all()) {
             for (auto& col : _lazy_read_ctx.predicate_columns.first) {
                 // clean block to read predicate columns
                 block->get_by_name(col).column->assume_mutable()->clear();
@@ -528,7 +530,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t 
batch_size, size_t* re
                     return Status::OK();
                 }
             } else { // pre_eof
-                // If select_vector_ptr->filter_all() and pre_eof, we can skip 
whole row group.
+                // If filter_map_ptr->filter_all() and pre_eof, we can skip 
whole row group.
                 *read_rows = 0;
                 *batch_eof = true;
                 _lazy_read_filtered_rows += (pre_read_rows + 
_cached_filtered_rows);
@@ -543,17 +545,17 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t 
batch_size, size_t* re
         return Status::Cancelled("cancelled");
     }
 
-    if (select_vector_ptr == nullptr) {
+    if (filter_map_ptr == nullptr) {
         DCHECK_EQ(pre_read_rows + _cached_filtered_rows, 0);
         *read_rows = 0;
         *batch_eof = true;
         return Status::OK();
     }
 
-    ColumnSelectVector& select_vector = *select_vector_ptr;
+    FilterMap& filter_map = *filter_map_ptr;
     std::unique_ptr<uint8_t[]> rebuild_filter_map = nullptr;
     if (_cached_filtered_rows != 0) {
-        _rebuild_select_vector(select_vector, rebuild_filter_map, 
pre_read_rows);
+        RETURN_IF_ERROR(_rebuild_filter_map(filter_map, rebuild_filter_map, 
pre_read_rows));
         pre_read_rows += _cached_filtered_rows;
         _cached_filtered_rows = 0;
     }
@@ -562,7 +564,8 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t 
batch_size, size_t* re
     size_t lazy_read_rows;
     bool lazy_eof;
     RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.lazy_read_columns, 
pre_read_rows,
-                                      &lazy_read_rows, &lazy_eof, 
select_vector));
+                                      &lazy_read_rows, &lazy_eof, filter_map));
+
     if (pre_read_rows != lazy_read_rows) {
         return Status::Corruption("Can't read the same number of rows when 
doing lazy read");
     }
@@ -570,7 +573,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t 
batch_size, size_t* re
     // we set pre_read_rows as batch_size for lazy read columns, so pre_eof != 
lazy_eof
 
     // filter data in predicate columns, and remove filter column
-    if (select_vector.has_filter()) {
+    if (filter_map.has_filter()) {
         if (block->columns() == origin_column_num) {
             // the whole row group has been filtered by 
_lazy_read_ctx.vconjunct_ctx, and batch_eof is
             // generated from next batch, so the filter column is removed 
ahead.
@@ -615,24 +618,24 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t 
batch_size, size_t* re
     return Status::OK();
 }
 
-void RowGroupReader::_rebuild_select_vector(ColumnSelectVector& select_vector,
-                                            std::unique_ptr<uint8_t[]>& 
filter_map,
-                                            size_t pre_read_rows) const {
+Status RowGroupReader::_rebuild_filter_map(FilterMap& filter_map,
+                                           std::unique_ptr<uint8_t[]>& 
filter_map_data,
+                                           size_t pre_read_rows) const {
     if (_cached_filtered_rows == 0) {
-        return;
+        return Status::OK();
     }
     size_t total_rows = _cached_filtered_rows + pre_read_rows;
-    if (select_vector.filter_all()) {
-        select_vector.build(nullptr, total_rows, true);
-        return;
+    if (filter_map.filter_all()) {
+        RETURN_IF_ERROR(filter_map.init(nullptr, total_rows, true));
+        return Status::OK();
     }
 
     uint8_t* map = new uint8_t[total_rows];
-    filter_map.reset(map);
+    filter_map_data.reset(map);
     for (size_t i = 0; i < _cached_filtered_rows; ++i) {
         map[i] = 0;
     }
-    const uint8_t* old_map = select_vector.filter_map();
+    const uint8_t* old_map = filter_map.filter_map_data();
     if (old_map == nullptr) {
         // select_vector.filter_all() == true is already built.
         for (size_t i = _cached_filtered_rows; i < total_rows; ++i) {
@@ -641,7 +644,8 @@ void 
RowGroupReader::_rebuild_select_vector(ColumnSelectVector& select_vector,
     } else {
         memcpy(map + _cached_filtered_rows, old_map, pre_read_rows);
     }
-    select_vector.build(map, total_rows, false);
+    RETURN_IF_ERROR(filter_map.init(map, total_rows, false));
+    return Status::OK();
 }
 
 Status RowGroupReader::_fill_partition_columns(
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index a889c1774ea..f73e9ebe09e 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -175,10 +175,10 @@ private:
     Status _read_empty_batch(size_t batch_size, size_t* read_rows, bool* 
batch_eof);
     Status _read_column_data(Block* block, const std::vector<std::string>& 
columns,
                              size_t batch_size, size_t* read_rows, bool* 
batch_eof,
-                             ColumnSelectVector& select_vector);
+                             FilterMap& filter_map);
     Status _do_lazy_read(Block* block, size_t batch_size, size_t* read_rows, 
bool* batch_eof);
-    void _rebuild_select_vector(ColumnSelectVector& select_vector,
-                                std::unique_ptr<uint8_t[]>& filter_map, size_t 
pre_read_rows) const;
+    Status _rebuild_filter_map(FilterMap& filter_map, 
std::unique_ptr<uint8_t[]>& filter_map_data,
+                               size_t pre_read_rows) const;
     Status _fill_partition_columns(
             Block* block, size_t rows,
             const std::unordered_map<std::string, std::tuple<std::string, 
const SlotDescriptor*>>&
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index ce3a1bbea84..f5e3e6fdcd5 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -477,8 +477,7 @@ Status ParquetReader::set_fill_columns(
         }
     }
 
-    if (!_lazy_read_ctx.has_complex_type && _enable_lazy_mat &&
-        _lazy_read_ctx.predicate_columns.first.size() > 0 &&
+    if (_enable_lazy_mat && _lazy_read_ctx.predicate_columns.first.size() > 0 
&&
         _lazy_read_ctx.lazy_read_columns.size() > 0) {
         _lazy_read_ctx.can_lazy_read = true;
     }
diff --git a/be/test/vec/exec/parquet/parquet_common_test.cpp 
b/be/test/vec/exec/parquet/parquet_common_test.cpp
new file mode 100644
index 00000000000..06715362937
--- /dev/null
+++ b/be/test/vec/exec/parquet/parquet_common_test.cpp
@@ -0,0 +1,457 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/format/parquet/parquet_common.h"
+
+#include <gtest/gtest.h>
+
+namespace doris::vectorized {
+
+// ============= FilterMap Tests =============
+class FilterMapTest : public testing::Test {
+protected:
+    void SetUp() override {}
+    void TearDown() override {}
+};
+
+// Basic initialization test
+TEST_F(FilterMapTest, test_basic_init) {
+    std::vector<uint8_t> filter_data = {1, 0, 1, 0};
+    FilterMap filter_map;
+    ASSERT_TRUE(filter_map.init(filter_data.data(), filter_data.size(), 
false).ok());
+
+    EXPECT_TRUE(filter_map.has_filter());
+    EXPECT_FALSE(filter_map.filter_all());
+    EXPECT_EQ(filter_map.filter_map_size(), 4);
+    EXPECT_DOUBLE_EQ(filter_map.filter_ratio(), 0.5);
+}
+
+// Empty filter test
+TEST_F(FilterMapTest, test_empty_filter) {
+    FilterMap filter_map;
+    ASSERT_TRUE(filter_map.init(nullptr, 0, false).ok());
+
+    EXPECT_FALSE(filter_map.has_filter());
+    EXPECT_FALSE(filter_map.filter_all());
+    EXPECT_DOUBLE_EQ(filter_map.filter_ratio(), 0.0);
+}
+
+// Test filter all
+TEST_F(FilterMapTest, test_filter_all) {
+    FilterMap filter_map;
+    ASSERT_TRUE(filter_map.init(nullptr, 0, true).ok());
+
+    EXPECT_TRUE(filter_map.has_filter());
+    EXPECT_TRUE(filter_map.filter_all());
+    EXPECT_DOUBLE_EQ(filter_map.filter_ratio(), 1.0);
+}
+
+// Test all zero filter
+TEST_F(FilterMapTest, test_all_zero_filter) {
+    std::vector<uint8_t> filter_data(100, 0); // Large data test
+    FilterMap filter_map;
+    ASSERT_TRUE(filter_map.init(filter_data.data(), filter_data.size(), 
false).ok());
+
+    EXPECT_TRUE(filter_map.has_filter());
+    EXPECT_TRUE(filter_map.filter_all());
+    EXPECT_DOUBLE_EQ(filter_map.filter_ratio(), 1.0);
+}
+
+// Test all one filter
+TEST_F(FilterMapTest, test_all_one_filter) {
+    std::vector<uint8_t> filter_data(100, 1);
+    FilterMap filter_map;
+    ASSERT_TRUE(filter_map.init(filter_data.data(), filter_data.size(), 
false).ok());
+
+    EXPECT_FALSE(filter_map.has_filter());
+    EXPECT_FALSE(filter_map.filter_all());
+    EXPECT_DOUBLE_EQ(filter_map.filter_ratio(), 0.0);
+}
+
+// Basic nested filter map generation test
+TEST_F(FilterMapTest, test_generate_nested_filter_map_basic) {
+    std::vector<uint8_t> filter_data = {1, 0, 1};
+    std::vector<level_t> rep_levels = {0, 1, 1, 0, 1, 0};
+
+    FilterMap filter_map;
+    ASSERT_TRUE(filter_map.init(filter_data.data(), filter_data.size(), 
false).ok());
+
+    std::vector<uint8_t> nested_filter_map_data;
+    std::unique_ptr<FilterMap> nested_filter_map;
+    size_t current_row = 0;
+
+    ASSERT_TRUE(filter_map
+                        .generate_nested_filter_map(rep_levels, 
nested_filter_map_data,
+                                                    &nested_filter_map, 
&current_row, 0)
+                        .ok());
+
+    std::vector<uint8_t> expected = {1, 1, 1, 0, 0, 1};
+    EXPECT_EQ(nested_filter_map_data, expected);
+    EXPECT_EQ(current_row, 2);
+}
+
+// Empty rep_levels test
+TEST_F(FilterMapTest, test_generate_nested_filter_map_empty_rep_levels) {
+    std::vector<uint8_t> filter_data = {1, 0, 1};
+    std::vector<level_t> rep_levels;
+
+    FilterMap filter_map;
+    ASSERT_TRUE(filter_map.init(filter_data.data(), filter_data.size(), 
false).ok());
+
+    std::vector<uint8_t> nested_filter_map_data;
+    std::unique_ptr<FilterMap> nested_filter_map;
+    size_t current_row = 0;
+
+    ASSERT_TRUE(filter_map
+                        .generate_nested_filter_map(rep_levels, 
nested_filter_map_data,
+                                                    &nested_filter_map, 
&current_row, 0)
+                        .ok());
+
+    EXPECT_TRUE(nested_filter_map_data.empty());
+    EXPECT_EQ(current_row, 0);
+}
+
+// Test nested filter map generation with start index
+TEST_F(FilterMapTest, test_generate_nested_filter_map_with_start_index) {
+    std::vector<uint8_t> filter_data = {1, 0, 1};
+    std::vector<level_t> rep_levels = {0, 1, 1, 0, 1, 0};
+    // 011, 01, 0
+    // 111, 00, 1
+
+    FilterMap filter_map;
+    ASSERT_TRUE(filter_map.init(filter_data.data(), filter_data.size(), 
false).ok());
+
+    std::vector<uint8_t> nested_filter_map_data;
+    std::unique_ptr<FilterMap> nested_filter_map;
+    size_t current_row = 1;
+
+    ASSERT_TRUE(filter_map
+                        .generate_nested_filter_map(rep_levels, 
nested_filter_map_data,
+                                                    &nested_filter_map, 
&current_row, 3)
+                        .ok());
+
+    std::vector<uint8_t> expected(6); // Initialize with zeros
+    expected[5] = 1;                  // Last value should be 1
+    EXPECT_EQ(nested_filter_map_data, expected);
+    EXPECT_EQ(current_row, 2);
+}
+
+// Test filter map boundary check
+TEST_F(FilterMapTest, test_generate_nested_filter_map_boundary) {
+    std::vector<uint8_t> filter_data = {1};
+    std::vector<level_t> rep_levels = {0, 1, 1, 0}; // Needs 2 rows but 
filter_data only has 1
+
+    FilterMap filter_map;
+    ASSERT_TRUE(filter_map.init(filter_data.data(), filter_data.size(), 
false).ok());
+
+    std::vector<uint8_t> nested_filter_map_data;
+    std::unique_ptr<FilterMap> nested_filter_map;
+    size_t current_row = 0;
+
+    // Should return error
+    auto status = filter_map.generate_nested_filter_map(rep_levels, 
nested_filter_map_data,
+                                                        &nested_filter_map, 
&current_row, 0);
+    EXPECT_FALSE(status.ok());
+}
+
+// Test can_filter_all functionality
+TEST_F(FilterMapTest, test_can_filter_all) {
+    std::vector<uint8_t> filter_data = {0, 0, 1, 0, 0, 1, 0};
+    FilterMap filter_map;
+    ASSERT_TRUE(filter_map.init(filter_data.data(), filter_data.size(), 
false).ok());
+
+    EXPECT_TRUE(filter_map.can_filter_all(2, 0));  // First two are 0
+    EXPECT_FALSE(filter_map.can_filter_all(3, 0)); // First three include 1
+    EXPECT_TRUE(filter_map.can_filter_all(2, 3));  // Two values starting at 
index 3 are 0
+    EXPECT_FALSE(filter_map.can_filter_all(2, 5)); // Index 5 contains 1
+    EXPECT_TRUE(filter_map.can_filter_all(1, 6));  // Last value is 0
+}
+
+// Test can_filter_all when filter_all is true
+TEST_F(FilterMapTest, test_can_filter_all_when_filter_all) {
+    FilterMap filter_map;
+    ASSERT_TRUE(filter_map.init(nullptr, 100, true).ok());
+
+    EXPECT_TRUE(filter_map.can_filter_all(50, 0));
+    EXPECT_TRUE(filter_map.can_filter_all(100, 0));
+}
+
+class CrossPageTest : public testing::Test {
+protected:
+    void SetUp() override {
+        filter_data = {1, 0, 1, 0, 1};
+
+        // 1111 00
+        page1_rep_levels = {0, 1, 1, 1, 0, 1};
+        // 00 11 000 1
+        page2_rep_levels = {1, 1, 0, 1, 0, 1, 1, 0};
+    }
+
+    std::vector<uint8_t> filter_data;
+    std::vector<level_t> page1_rep_levels;
+    std::vector<level_t> page2_rep_levels;
+};
+
+TEST_F(CrossPageTest, test_basic1) {
+    FilterMap filter_map;
+    ASSERT_TRUE(filter_map.init(filter_data.data(), filter_data.size(), 
false).ok());
+
+    std::vector<uint8_t> nested_filter_map_data;
+    std::unique_ptr<FilterMap> nested_filter_map;
+    size_t current_row = 0;
+    std::vector<level_t> rep_levels;
+    rep_levels.insert(rep_levels.end(), page1_rep_levels.begin(), 
page1_rep_levels.end());
+    rep_levels.insert(rep_levels.end(), page2_rep_levels.begin(), 
page2_rep_levels.end());
+
+    ASSERT_TRUE(filter_map
+                        .generate_nested_filter_map(rep_levels, 
nested_filter_map_data,
+                                                    &nested_filter_map, 
&current_row, 0)
+                        .ok());
+
+    std::vector<uint8_t> expected = {1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 1};
+
+    EXPECT_EQ(nested_filter_map_data, expected);
+
+    EXPECT_EQ(current_row, 4);
+}
+
+TEST_F(CrossPageTest, test_basic2) {
+    FilterMap filter_map;
+    ASSERT_TRUE(filter_map.init(filter_data.data(), filter_data.size(), 
false).ok());
+
+    std::vector<uint8_t> nested_filter_map_data;
+    std::unique_ptr<FilterMap> nested_filter_map;
+
+    size_t current_row = 0;
+    std::vector<level_t> rep_levels;
+    rep_levels.insert(rep_levels.end(), page1_rep_levels.begin(), 
page1_rep_levels.end());
+
+    ASSERT_TRUE(filter_map
+                        .generate_nested_filter_map(rep_levels, 
nested_filter_map_data,
+                                                    &nested_filter_map, 
&current_row, 0)
+                        .ok());
+    std::vector<uint8_t> expected1 = {1, 1, 1, 1, 0, 0};
+
+    EXPECT_EQ(nested_filter_map_data, expected1);
+    EXPECT_EQ(current_row, 1);
+
+    rep_levels.insert(rep_levels.end(), page2_rep_levels.begin(), 
page2_rep_levels.end());
+
+    size_t start_index = page1_rep_levels.size();
+    ASSERT_TRUE(filter_map
+                        .generate_nested_filter_map(rep_levels, 
nested_filter_map_data,
+                                                    &nested_filter_map, 
&current_row, start_index)
+                        .ok());
+
+    std::vector<uint8_t> expected2 = {1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 
1};
+
+    EXPECT_EQ(nested_filter_map_data, expected2);
+    EXPECT_EQ(current_row, 4);
+}
+
+// ============= ColumnSelectVector Tests =============
+class ColumnSelectVectorTest : public testing::Test {
+protected:
+    void SetUp() override {}
+    void TearDown() override {}
+};
+
+// Basic initialization test
+TEST_F(ColumnSelectVectorTest, test_basic_init) {
+    std::vector<uint16_t> run_length_null_map = {2, 1, 3}; // 2 non-null, 1 
null, 3 non-null
+    std::vector<uint8_t> filter_data = {1, 0, 1, 0, 1, 0};
+    FilterMap filter_map;
+    ASSERT_TRUE(filter_map.init(filter_data.data(), filter_data.size(), 
false).ok());
+
+    ColumnSelectVector select_vector;
+    NullMap null_map;
+    ASSERT_TRUE(select_vector.init(run_length_null_map, 6, &null_map, 
&filter_map, 0).ok());
+
+    EXPECT_TRUE(select_vector.has_filter());
+    EXPECT_EQ(select_vector.num_values(), 6);
+    EXPECT_EQ(select_vector.num_nulls(), 1);
+    EXPECT_EQ(select_vector.num_filtered(), 3);
+}
+
+// Test initialization without null map
+TEST_F(ColumnSelectVectorTest, test_init_without_null_map) {
+    std::vector<uint16_t> run_length_null_map = {2, 1, 3};
+    std::vector<uint8_t> filter_data = {1, 1, 1, 1, 1, 1};
+    FilterMap filter_map;
+    ASSERT_TRUE(filter_map.init(filter_data.data(), filter_data.size(), 
false).ok());
+
+    ColumnSelectVector select_vector;
+    ASSERT_TRUE(select_vector.init(run_length_null_map, 6, nullptr, 
&filter_map, 0).ok());
+
+    EXPECT_EQ(select_vector.num_nulls(), 1);
+    EXPECT_EQ(select_vector.num_filtered(), 0);
+}
+
+// Test all null values
+TEST_F(ColumnSelectVectorTest, test_all_null) {
+    std::vector<uint16_t> run_length_null_map = {0, 6}; // All null
+    std::vector<uint8_t> filter_data = {1, 1, 1, 1, 1, 1};
+    FilterMap filter_map;
+    ASSERT_TRUE(filter_map.init(filter_data.data(), filter_data.size(), 
false).ok());
+
+    ColumnSelectVector select_vector;
+    NullMap null_map;
+    ASSERT_TRUE(select_vector.init(run_length_null_map, 6, &null_map, 
&filter_map, 0).ok());
+
+    EXPECT_EQ(select_vector.num_nulls(), 6);
+    EXPECT_EQ(select_vector.num_filtered(), 0);
+
+    // Verify null_map
+    EXPECT_EQ(null_map.size(), 6);
+    for (size_t i = 0; i < 6; i++) {
+        EXPECT_EQ(null_map[i], 1);
+    }
+}
+
+// Test no null values
+TEST_F(ColumnSelectVectorTest, test_no_null) {
+    std::vector<uint16_t> run_length_null_map = {6}; // All non-null
+    std::vector<uint8_t> filter_data = {1, 1, 1, 1, 1, 1};
+    FilterMap filter_map;
+    ASSERT_TRUE(filter_map.init(filter_data.data(), filter_data.size(), 
false).ok());
+
+    ColumnSelectVector select_vector;
+    NullMap null_map;
+    ASSERT_TRUE(select_vector.init(run_length_null_map, 6, &null_map, 
&filter_map, 0).ok());
+
+    EXPECT_EQ(select_vector.num_nulls(), 0);
+    EXPECT_EQ(select_vector.num_filtered(), 0);
+
+    // Verify null_map
+    EXPECT_EQ(null_map.size(), 6);
+    for (size_t i = 0; i < 6; i++) {
+        EXPECT_EQ(null_map[i], 0);
+    }
+}
+
+// Test get_next_run with filter
+TEST_F(ColumnSelectVectorTest, test_get_next_run_with_filter) {
+    std::vector<uint16_t> run_length_null_map = {2, 1, 3}; // 1, 1, 0, 1, 1, 1
+    std::vector<uint8_t> filter_data = {1, 1, 0, 1, 1, 0};
+    FilterMap filter_map;
+    ASSERT_TRUE(filter_map.init(filter_data.data(), filter_data.size(), 
false).ok());
+
+    ColumnSelectVector select_vector;
+    NullMap null_map;
+    ASSERT_TRUE(select_vector.init(run_length_null_map, 6, &null_map, 
&filter_map, 0).ok());
+
+    ColumnSelectVector::DataReadType type;
+
+    // Verify read sequence
+    EXPECT_EQ(select_vector.get_next_run<true>(&type), 2);
+    EXPECT_EQ(type, ColumnSelectVector::CONTENT);
+
+    EXPECT_EQ(select_vector.get_next_run<true>(&type), 1);
+    EXPECT_EQ(type, ColumnSelectVector::FILTERED_NULL);
+
+    EXPECT_EQ(select_vector.get_next_run<true>(&type), 2);
+    EXPECT_EQ(type, ColumnSelectVector::CONTENT);
+
+    EXPECT_EQ(select_vector.get_next_run<true>(&type), 1);
+    EXPECT_EQ(type, ColumnSelectVector::FILTERED_CONTENT);
+}
+
+// Test get_next_run without filter
+TEST_F(ColumnSelectVectorTest, test_get_next_run_without_filter) {
+    std::vector<uint16_t> run_length_null_map = {2, 1, 3};
+    std::vector<uint8_t> filter_data = {1, 1, 1, 1, 1, 1};
+    FilterMap filter_map;
+    ASSERT_TRUE(filter_map.init(nullptr, 0, false).ok());
+
+    ColumnSelectVector select_vector;
+    NullMap null_map;
+    ASSERT_TRUE(select_vector.init(run_length_null_map, 6, &null_map, 
&filter_map, 0).ok());
+
+    ColumnSelectVector::DataReadType type;
+
+    // Verify read sequence
+    EXPECT_EQ(select_vector.get_next_run<false>(&type), 2);
+    EXPECT_EQ(type, ColumnSelectVector::CONTENT);
+
+    EXPECT_EQ(select_vector.get_next_run<false>(&type), 1);
+    EXPECT_EQ(type, ColumnSelectVector::NULL_DATA);
+
+    EXPECT_EQ(select_vector.get_next_run<false>(&type), 3);
+    EXPECT_EQ(type, ColumnSelectVector::CONTENT);
+
+    EXPECT_EQ(select_vector.get_next_run<false>(&type), 0);
+}
+
+// Test complex null pattern
+TEST_F(ColumnSelectVectorTest, test_complex_null_pattern) {
+    // Alternating null and non-null values
+    std::vector<uint16_t> run_length_null_map = {1, 1, 1, 1, 1, 1}; // 1, 0, 
1, 0, 1, 0
+    std::vector<uint8_t> filter_data = {1, 0, 1, 0, 1, 0};
+    FilterMap filter_map;
+    ASSERT_TRUE(filter_map.init(filter_data.data(), filter_data.size(), 
false).ok());
+
+    ColumnSelectVector select_vector;
+    NullMap null_map;
+    ASSERT_TRUE(select_vector.init(run_length_null_map, 6, &null_map, 
&filter_map, 0).ok());
+
+    EXPECT_EQ(select_vector.num_nulls(), 3);
+    EXPECT_EQ(select_vector.num_filtered(), 3);
+
+    ColumnSelectVector::DataReadType type;
+
+    // Verify alternating read pattern
+    EXPECT_EQ(select_vector.get_next_run<true>(&type), 1);
+    EXPECT_EQ(type, ColumnSelectVector::CONTENT);
+
+    EXPECT_EQ(select_vector.get_next_run<true>(&type), 1);
+    EXPECT_EQ(type, ColumnSelectVector::FILTERED_NULL);
+
+    EXPECT_EQ(select_vector.get_next_run<true>(&type), 1);
+    EXPECT_EQ(type, ColumnSelectVector::CONTENT);
+
+    EXPECT_EQ(select_vector.get_next_run<true>(&type), 1);
+    EXPECT_EQ(type, ColumnSelectVector::FILTERED_NULL);
+
+    EXPECT_EQ(select_vector.get_next_run<true>(&type), 1);
+    EXPECT_EQ(type, ColumnSelectVector::CONTENT);
+
+    EXPECT_EQ(select_vector.get_next_run<true>(&type), 1);
+    EXPECT_EQ(type, ColumnSelectVector::FILTERED_NULL);
+}
+
+// Test filter_map_index
+TEST_F(ColumnSelectVectorTest, test_filter_map_index) {
+    std::vector<uint16_t> run_length_null_map = {0, 1, 3}; // 0, 1, 1, 1
+    std::vector<uint8_t> filter_data = {0, 0, 1, 1, 1, 1};
+    FilterMap filter_map;
+    ASSERT_TRUE(filter_map.init(filter_data.data(), filter_data.size(), 
false).ok());
+
+    ColumnSelectVector select_vector;
+    NullMap null_map;
+    ASSERT_TRUE(select_vector.init(run_length_null_map, 4, &null_map, 
&filter_map, 2).ok());
+
+    EXPECT_EQ(select_vector.num_filtered(), 0);
+
+    ColumnSelectVector::DataReadType type;
+    EXPECT_EQ(select_vector.get_next_run<true>(&type), 1);
+    EXPECT_EQ(type, ColumnSelectVector::NULL_DATA);
+
+    EXPECT_EQ(select_vector.get_next_run<true>(&type), 3);
+    EXPECT_EQ(type, ColumnSelectVector::CONTENT);
+}
+
+} // namespace doris::vectorized
diff --git a/be/test/vec/exec/parquet/parquet_nested_type_cross_page_test.cpp 
b/be/test/vec/exec/parquet/parquet_nested_type_cross_page_test.cpp
new file mode 100644
index 00000000000..cff0937910e
--- /dev/null
+++ b/be/test/vec/exec/parquet/parquet_nested_type_cross_page_test.cpp
@@ -0,0 +1,179 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include "vec/exec/format/parquet/parquet_common.h"
+
+namespace doris::vectorized {
+class ParquetCrossPageTest : public testing::Test {
+protected:
+    void SetUp() override {
+        // filter_data is row-level, corresponding to 8 rows
+        filter_data = {1, 0, 0, 1, 0, 0, 1, 1}; // filter conditions for 8 rows
+
+        // Page 1: 2 complete rows + 1 incomplete row
+        page1_rep_levels = {
+                0, 1, 1, // Row 1: [1,1,1]
+                0, 1,    // Row 2: [1,1]
+                0, 1, 1  // Row 3: [1,1,to be continued...]
+        };
+
+        // Page 2: continue Row 3 + 2 complete rows + 1 incomplete row
+        page2_rep_levels = {
+                1, 1,    // Continue Row 3: [...1,1]
+                0, 1,    // Row 4: [1]
+                0, 1, 1, // Row 5: [1,1,1]
+                0, 1     // Row 6: [1,to be continued...]
+        };
+
+        // Page 3: continue Row 6 + 2 complete rows
+        page3_rep_levels = {
+                1, 1,    // Continue Row 6: [...1,1]
+                0, 1, 1, // Row 7: [1,1]
+                0, 1     // Row 8: [1]
+        };
+    }
+
+    std::vector<uint8_t> filter_data; // Row-level filter conditions for all 
data
+    std::vector<level_t> page1_rep_levels;
+    std::vector<level_t> page2_rep_levels;
+    std::vector<level_t> page3_rep_levels;
+};
+
+// Test complete processing of three pages
+TEST_F(ParquetCrossPageTest, test_three_pages_complete) {
+    size_t current_row = 0;
+
+    // Process first page
+    FilterMap filter_map;
+    ASSERT_TRUE(filter_map.init(filter_data.data(), filter_data.size(), 
false).ok());
+
+    std::vector<uint8_t> nested_data1;
+    std::unique_ptr<FilterMap> nested_map1;
+
+    ASSERT_TRUE(filter_map
+                        .generate_nested_filter_map(page1_rep_levels, 
nested_data1, &nested_map1,
+                                                    &current_row, 0)
+                        .ok());
+
+    // Verify first page results - using corresponding rows from overall 
filter_data
+    std::vector<uint8_t> expected1 = {1, 1, 1, 0, 0, 0, 0, 0};
+    EXPECT_EQ(nested_data1, expected1);
+    EXPECT_EQ(current_row, 2); // Processed up to Row 3
+
+    // Process second page - continue with same filter_map
+    std::vector<uint8_t> nested_data2;
+    std::unique_ptr<FilterMap> nested_map2;
+
+    ASSERT_TRUE(filter_map
+                        .generate_nested_filter_map(page2_rep_levels, 
nested_data2, &nested_map2,
+                                                    &current_row, 0)
+                        .ok());
+
+    // Verify second page results
+    std::vector<uint8_t> expected2 = {0, 0, 1, 1, 0, 0, 0, 0, 0};
+    EXPECT_EQ(nested_data2, expected2);
+    EXPECT_EQ(current_row, 5); // Processed up to Row 6
+
+    // Process third page - continue with same filter_map
+    std::vector<uint8_t> nested_data3;
+    std::unique_ptr<FilterMap> nested_map3;
+
+    ASSERT_TRUE(filter_map
+                        .generate_nested_filter_map(page3_rep_levels, 
nested_data3, &nested_map3,
+                                                    &current_row, 0)
+                        .ok());
+
+    // Verify third page results
+    std::vector<uint8_t> expected3 = {0, 0, 1, 1, 1, 1, 1};
+    EXPECT_EQ(nested_data3, expected3);
+    EXPECT_EQ(current_row, 7); // Processed all 8 rows
+}
+
+// Test case where a single row spans three pages
+TEST_F(ParquetCrossPageTest, test_single_row_across_three_pages) {
+    // Filter for one long array row
+    std::vector<uint8_t> row_filter = {1, 0}; // Only 2 rows of data
+
+    // First page
+    std::vector<level_t> rep1 = {
+            0,       // Start of first row
+            1, 1, 1, // 3 array elements
+            1, 1     // To be continued...
+    };
+
+    // Second page
+    std::vector<level_t> rep2 = {
+            1, 1, 1, // Continue with 3 array elements
+            1, 1, 1  // To be continued...
+    };
+
+    // Third page
+    std::vector<level_t> rep3 = {
+            1, 1, 1, // Last 3 array elements
+            0        // Start of second row
+    };
+
+    size_t current_row = 0;
+
+    // Use same filter_map for all pages
+    FilterMap filter_map;
+    ASSERT_TRUE(filter_map.init(row_filter.data(), row_filter.size(), 
false).ok());
+
+    // Process first page
+    std::vector<uint8_t> nested_data1;
+    std::unique_ptr<FilterMap> nested_map1;
+
+    ASSERT_TRUE(
+            filter_map.generate_nested_filter_map(rep1, nested_data1, 
&nested_map1, &current_row, 0)
+                    .ok());
+
+    // Verify first page results
+    std::vector<uint8_t> expected1(rep1.size(), 1); // All use first row's 
filter value
+    EXPECT_EQ(nested_data1, expected1);
+    EXPECT_EQ(current_row, 0); // Still in first row
+
+    // Process second page
+    std::vector<uint8_t> nested_data2;
+    std::unique_ptr<FilterMap> nested_map2;
+
+    ASSERT_TRUE(
+            filter_map.generate_nested_filter_map(rep2, nested_data2, 
&nested_map2, &current_row, 0)
+                    .ok());
+
+    // Verify second page results
+    std::vector<uint8_t> expected2(rep2.size(), 1); // Still using first row's 
filter value
+    EXPECT_EQ(nested_data2, expected2);
+    EXPECT_EQ(current_row, 0); // Still in first row
+
+    // Process third page
+    std::vector<uint8_t> nested_data3;
+    std::unique_ptr<FilterMap> nested_map3;
+
+    ASSERT_TRUE(
+            filter_map.generate_nested_filter_map(rep3, nested_data3, 
&nested_map3, &current_row, 0)
+                    .ok());
+
+    // Verify third page results
+    std::vector<uint8_t> expected3(rep3.size(), 1);
+    expected3.back() = 0; // Last element uses second row's filter value
+    EXPECT_EQ(nested_data3, expected3);
+    EXPECT_EQ(current_row, 1); // Moved to second row
+}
+
+} // namespace doris::vectorized
diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp 
b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
index 19b21c16a45..b792704bd0e 100644
--- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
@@ -229,12 +229,14 @@ static Status get_column_values(io::FileReaderSPtr 
file_reader, tparquet::Column
     } else {
         data_column = src_column->assume_mutable();
     }
+    FilterMap filter_map;
+    RETURN_IF_ERROR(filter_map.init(nullptr, 0, false));
     ColumnSelectVector run_length_map;
     // decode page data
     if (field_schema->definition_level == 0) {
         // required column
         std::vector<u_short> null_map = {(u_short)rows};
-        run_length_map.set_run_length_null_map(null_map, rows, nullptr);
+        RETURN_IF_ERROR(run_length_map.init(null_map, rows, nullptr, 
&filter_map, 0));
         RETURN_IF_ERROR(
                 chunk_reader.decode_values(data_column, resolved_type, 
run_length_map, false));
     } else {
@@ -248,7 +250,7 @@ static Status get_column_values(io::FileReaderSPtr 
file_reader, tparquet::Column
                     chunk_reader.insert_null_values(data_column, num_values);
                 } else {
                     std::vector<u_short> null_map = {(u_short)num_values};
-                    run_length_map.set_run_length_null_map(null_map, rows, 
nullptr);
+                    RETURN_IF_ERROR(run_length_map.init(null_map, rows, 
nullptr, &filter_map, 0));
                     RETURN_IF_ERROR(chunk_reader.decode_values(data_column, 
resolved_type,
                                                                run_length_map, 
false));
                 }
@@ -263,7 +265,7 @@ static Status get_column_values(io::FileReaderSPtr 
file_reader, tparquet::Column
             chunk_reader.insert_null_values(data_column, num_values);
         } else {
             std::vector<u_short> null_map = {(u_short)num_values};
-            run_length_map.set_run_length_null_map(null_map, rows, nullptr);
+            RETURN_IF_ERROR(run_length_map.init(null_map, rows, nullptr, 
&filter_map, 0));
             RETURN_IF_ERROR(
                     chunk_reader.decode_values(data_column, resolved_type, 
run_length_map, false));
         }
diff --git 
a/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_nested_types/create_table.hql
 
b/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_nested_types/create_table.hql
new file mode 100644
index 00000000000..863595278f3
--- /dev/null
+++ 
b/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_nested_types/create_table.hql
@@ -0,0 +1,58 @@
+CREATE DATABASE IF NOT EXISTS multi_catalog;
+USE multi_catalog;
+
+CREATE TABLE `nested_cross_page1_parquet`(
+    `id` int,
+    `array_col` array<int>,
+    `description` string)
+ROW FORMAT SERDE
+  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+STORED AS INPUTFORMAT
+  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+OUTPUTFORMAT
+  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+LOCATION
+  '/user/doris/suites/multi_catalog/nested_cross_page1_parquet';
+
+msck repair table nested_cross_page1_parquet;
+
+CREATE TABLE `nested_cross_page2_parquet`(
+    id INT,
+    nested_array_col ARRAY<ARRAY<INT>>,
+    array_struct_col ARRAY<STRUCT<x:INT, y:STRING>>,
+    map_array_col MAP<STRING, ARRAY<INT>>,
+    complex_struct_col STRUCT<
+        a: ARRAY<INT>,
+        b: MAP<STRING, ARRAY<INT>>,
+        c: STRUCT<
+            x: ARRAY<INT>,
+            y: STRING
+        >
+    >,
+    description STRING)
+ROW FORMAT SERDE
+  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+STORED AS INPUTFORMAT
+  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+OUTPUTFORMAT
+  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+LOCATION
+  '/user/doris/suites/multi_catalog/nested_cross_page2_parquet';
+
+msck repair table nested_cross_page2_parquet;
+
+CREATE TABLE `nested_cross_page3_parquet`(
+    `id` int,
+    `array_col` array<int>,
+    `description` string)
+ROW FORMAT SERDE
+  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+STORED AS INPUTFORMAT
+  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+OUTPUTFORMAT
+  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+LOCATION
+  '/user/doris/suites/multi_catalog/nested_cross_page3_parquet';
+
+msck repair table nested_cross_page3_parquet;
+
diff --git 
a/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_nested_types/data.tar.gz
 
b/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_nested_types/data.tar.gz
new file mode 100644
index 00000000000..ce0d98f7bfc
Binary files /dev/null and 
b/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_nested_types/data.tar.gz
 differ
diff --git 
a/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_nested_types/data_gen_scripts/nested_cross_page_test1.py
 
b/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_nested_types/data_gen_scripts/nested_cross_page_test1.py
new file mode 100644
index 00000000000..9fff6d362cd
--- /dev/null
+++ 
b/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_nested_types/data_gen_scripts/nested_cross_page_test1.py
@@ -0,0 +1,192 @@
+import pyarrow as pa
+import pyarrow.parquet as pq
+import subprocess
+import argparse
+import json
+
+# Define the output file path as a constant
+OUTPUT_PARQUET_FILE = 'nested_cross_page_test1.parquet'
+
+def generate_cross_page_test_data(output_file):
+    # Create test data
+    data = {
+        # id column (INT32)
+        'id': [1, 2, 3],
+        
+        # array column (ARRAY<INT>)
+        'array_col': [
+            # Row 1 - Large array to force cross-page
+            [1, 2, 3, 4, 5] * 200,  # 1000 elements
+            
+            # Row 2 - Normal sized array
+            [1, 2, 3],
+            
+            # Row 3 - Another large array
+            [6, 7, 8, 9, 10] * 200  # 1000 elements
+        ],
+        
+        # description column (STRING)
+        'description': [
+            'This is a large array with repeated sequence [1,2,3,4,5]',
+            'This is a small array with just three elements',
+            'This is another large array with repeated sequence [6,7,8,9,10]'
+        ]
+    }
+    
+    # Create table structure
+    table = pa.Table.from_pydict({
+        'id': pa.array(data['id'], type=pa.int32()),
+        'array_col': pa.array(data['array_col'], type=pa.list_(pa.int32())),
+        'description': pa.array(data['description'], type=pa.string())
+    })
+    
+    # Write to parquet file
+    pq.write_table(
+        table,
+        output_file,
+        compression=None,  # No compression for predictable size
+        version='2.6',
+        write_statistics=True,
+        row_group_size=3,        # All data in one row group
+        data_page_size=100,      # Very small page size
+        write_batch_size=10      # Small batch size
+    )
+
+def inspect_parquet_file(file_path):
+    """Inspect the structure of generated parquet file"""
+    pf = pq.ParquetFile(file_path)
+    print(f"\nFile: {file_path}")
+    print(f"Number of row groups: {pf.num_row_groups}")
+    
+    metadata = pf.metadata
+    schema = pf.schema
+    print(f"\nSchema: {schema}")
+    print(f"\nDetailed schema:")
+    for i in range(len(schema)):
+        print(f"Column {i}: {schema[i]}")
+    
+    for i in range(metadata.num_row_groups):
+        rg = metadata.row_group(i)
+        print(f"\nRow Group {i}:")
+        print(f"Num rows: {rg.num_rows}")
+        
+        for j in range(rg.num_columns):
+            col = rg.column(j)
+            print(f"\nColumn {j}:")
+            print(f"Path: {schema[j].name}")
+            print(f"Type: {col.physical_type}")
+            print(f"Encodings: {col.encodings}")
+            print(f"Total compressed size: {col.total_compressed_size}")
+            print(f"Total uncompressed size: {col.total_uncompressed_size}")
+            print(f"Number of values: {col.num_values}")
+            print(f"Data page offset: {col.data_page_offset}")
+            if col.dictionary_page_offset is not None:
+                print(f"Dictionary page offset: {col.dictionary_page_offset}")
+
+def read_and_print_file(file_path):
+    """Read and print file content"""
+    table = pq.read_table(file_path)
+    df = table.to_pandas()
+    print("\nFile content:")
+    for i in range(len(df)):
+        print(f"\nRow {i}:")
+        print(f"ID: {df.iloc[i]['id']}")
+        arr = df.iloc[i]['array_col']
+        print(f"Array length: {len(arr)}")
+        print(f"First few elements: {arr[:5]}...")
+        print(f"Last few elements: ...{arr[-5:]}")
+        print(f"Description: {df.iloc[i]['description']}")
+
+def inspect_pages_with_cli(file_path, parquet_cli_path=None):
+    """
+    Inspect page information using parquet-cli
+    
+    Args:
+        file_path: Path to the parquet file
+        parquet_cli_path: Optional path to parquet-cli jar file
+    """
+    if not parquet_cli_path:
+        print("\nSkipping parquet-cli inspection: No parquet-cli path 
provided")
+        return
+
+    print("\nParquet CLI Output:")
+    try:
+        cmd = f"java -jar {parquet_cli_path} pages {file_path}"
+        result = subprocess.run(cmd, shell=True, check=True, 
capture_output=True, text=True)
+        print(result.stdout)
+    except subprocess.CalledProcessError as e:
+        print(f"Error running parquet-cli: {e}")
+        if e.output:
+            print(f"Error output: {e.output}")
+    except Exception as e:
+        print(f"Unexpected error running parquet-cli: {e}")
+
+def save_test_data_info(output_file):
+    """Save detailed test data information to text file"""
+    info = {
+        "file_format": "Parquet",
+        "version": "2.6",
+        "compression": "None",
+        "row_group_size": 3,
+        "data_page_size": 100,
+        "write_batch_size": 10,
+        "output_file": output_file,
+        "schema": {
+            "id": "INT32",
+            "array_col": "ARRAY<INT32>",
+            "description": "STRING"
+        },
+        "test_cases": [
+            {
+                "row": 1,
+                "description": "Large array",
+                "characteristics": [
+                    "1000 elements",
+                    "Repeated sequence [1,2,3,4,5]",
+                    "Forces cross-page scenario"
+                ]
+            },
+            {
+                "row": 2,
+                "description": "Small array",
+                "characteristics": [
+                    "3 elements",
+                    "Simple sequence [1,2,3]",
+                    "Fits in single page"
+                ]
+            },
+            {
+                "row": 3,
+                "description": "Another large array",
+                "characteristics": [
+                    "1000 elements",
+                    "Repeated sequence [6,7,8,9,10]",
+                    "Forces cross-page scenario"
+                ]
+            }
+        ]
+    }
+    
+    info_file = output_file.replace('.parquet', '_info.json')
+    with open(info_file, 'w') as f:
+        json.dump(info, f, indent=2)
+
+if __name__ == '__main__':
+    # Add command line argument parsing
+    parser = argparse.ArgumentParser(description='Generate and inspect parquet 
test data')
+    parser.add_argument('--parquet-cli', 
+                       help='Path to parquet-cli jar file',
+                       default=None)
+    parser.add_argument('--output',
+                       help='Output parquet file path',
+                       default=OUTPUT_PARQUET_FILE)
+    args = parser.parse_args()
+
+    # Use the output file path from command line or default
+    output_file = args.output
+
+    generate_cross_page_test_data(output_file)
+    inspect_parquet_file(output_file)
+    read_and_print_file(output_file)
+    inspect_pages_with_cli(output_file, args.parquet_cli)
+    save_test_data_info(output_file)
diff --git 
a/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_nested_types/data_gen_scripts/nested_cross_page_test2.py
 
b/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_nested_types/data_gen_scripts/nested_cross_page_test2.py
new file mode 100644
index 00000000000..2e67e962131
--- /dev/null
+++ 
b/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_nested_types/data_gen_scripts/nested_cross_page_test2.py
@@ -0,0 +1,287 @@
+import pyarrow as pa
+import pyarrow.parquet as pq
+import subprocess
+import json
+import argparse
+
+# Define the output file path as a constant
+OUTPUT_PARQUET_FILE = 'nested_cross_page_test2.parquet'
+
+def generate_cross_page_test_data(output_file):
+    # Create test data
+    data = {
+        # id column (INT32)
+        'id': [1, 2, 3],
+        
+        # Multi-level array column (ARRAY<ARRAY<INT>>)
+        'nested_array_col': [
+            # Row 1 - Large array
+            [[i for i in range(10)] for _ in range(100)],  # 100 sub-arrays, 
each with 10 elements
+            
+            # Row 2 - Small array
+            [[1, 2], [3, 4], [5, 6]],
+            
+            # Row 3 - Another large array
+            [[i for i in range(10, 20)] for _ in range(100)]
+        ],
+        
+        # Struct array column (ARRAY<STRUCT<x: int, y: string>>)
+        'array_struct_col': [
+            # Row 1
+            [{'x': i, 'y': f'value_{i}'} for i in range(500)],
+            
+            # Row 2
+            [{'x': 1, 'y': 'small'}, {'x': 2, 'y': 'array'}],
+            
+            # Row 3
+            [{'x': i, 'y': f'big_{i}'} for i in range(500)]
+        ],
+        
+        # Map column (MAP<STRING, ARRAY<INT>>)
+        'map_array_col': [
+            # Row 1
+            {f'key_{i}': list(range(i, i+10)) for i in range(50)},
+            
+            # Row 2
+            {'small_key': [1, 2, 3]},
+            
+            # Row 3
+            {f'big_key_{i}': list(range(i*10, (i+1)*10)) for i in range(50)}
+        ],
+        
+        # Complex nested structure (STRUCT<
+        #   a: ARRAY<INT>, 
+        #   b: MAP<STRING, ARRAY<INT>>,
+        #   c: STRUCT<x: ARRAY<INT>, y: STRING>
+        # >)
+        'complex_struct_col': [
+            # Row 1
+            {
+                'a': list(range(100)),
+                'b': {f'key_{i}': list(range(i, i+5)) for i in range(20)},
+                'c': {'x': list(range(50)), 'y': 'nested_struct_1'}
+            },
+            
+            # Row 2
+            {
+                'a': [1, 2, 3],
+                'b': {'small': [1, 2]},
+                'c': {'x': [1], 'y': 'small_struct'}
+            },
+            
+            # Row 3
+            {
+                'a': list(range(100, 200)),
+                'b': {f'big_{i}': list(range(i*5, (i+1)*5)) for i in 
range(20)},
+                'c': {'x': list(range(50)), 'y': 'nested_struct_2'}
+            }
+        ],
+        
+        # Description column (STRING)
+        'description': [
+            'Row with large nested arrays and structures',
+            'Row with small nested data',
+            'Row with another set of large nested arrays and structures'
+        ]
+    }
+    
+    # Create complex table structure
+    table = pa.Table.from_pydict({
+        'id': pa.array(data['id'], type=pa.int32()),
+        
+        # Multi-level array type
+        'nested_array_col': pa.array(data['nested_array_col'], 
+                                   type=pa.list_(pa.list_(pa.int32()))),
+        
+        # Struct array type
+        'array_struct_col': pa.array(data['array_struct_col'],
+                                   type=pa.list_(pa.struct([
+                                       ('x', pa.int32()),
+                                       ('y', pa.string())
+                                   ]))),
+        
+        # Map type
+        'map_array_col': pa.array(data['map_array_col'],
+                                type=pa.map_(pa.string(), 
pa.list_(pa.int32()))),
+        
+        # Complex nested structure type
+        'complex_struct_col': pa.array(data['complex_struct_col'],
+                                     type=pa.struct([
+                                         ('a', pa.list_(pa.int32())),
+                                         ('b', pa.map_(pa.string(), 
pa.list_(pa.int32()))),
+                                         ('c', pa.struct([
+                                             ('x', pa.list_(pa.int32())),
+                                             ('y', pa.string())
+                                         ]))
+                                     ])),
+        
+        'description': pa.array(data['description'], type=pa.string())
+    })
+    
+    # Write to parquet file
+    pq.write_table(
+        table,
+        output_file,
+        compression=None,  # No compression
+        version='2.6',
+        write_statistics=True,
+        row_group_size=3,        # All data in one row group
+        data_page_size=100,      # Very small page size
+        write_batch_size=1       # Minimum batch size
+    )
+
+def inspect_parquet_file(file_path):
+    """Inspect the structure of generated parquet file"""
+    pf = pq.ParquetFile(file_path)
+    print(f"\nFile: {file_path}")
+    print(f"Number of row groups: {pf.num_row_groups}")
+    
+    metadata = pf.metadata
+    schema = pf.schema
+    print(f"\nSchema: {schema}")
+    print(f"\nDetailed schema:")
+    for i in range(len(schema)):
+        print(f"Column {i}: {schema[i]}")
+    
+    for i in range(metadata.num_row_groups):
+        rg = metadata.row_group(i)
+        print(f"\nRow Group {i}:")
+        print(f"Num rows: {rg.num_rows}")
+        
+        for j in range(rg.num_columns):
+            col = rg.column(j)
+            print(f"\nColumn {j}:")
+            print(f"Path: {schema[j].name}")
+            print(f"Type: {col.physical_type}")
+            print(f"Encodings: {col.encodings}")
+            print(f"Total compressed size: {col.total_compressed_size}")
+            print(f"Total uncompressed size: {col.total_uncompressed_size}")
+            print(f"Number of values: {col.num_values}")
+            print(f"Data page offset: {col.data_page_offset}")
+            if col.dictionary_page_offset is not None:
+                print(f"Dictionary page offset: {col.dictionary_page_offset}")
+
+def format_value(value):
+    """Format value for printing"""
+    if isinstance(value, (list, dict)):
+        return f"{str(value)[:100]}... (length: {len(str(value))})"
+    return str(value)
+
+def read_and_print_file(file_path):
+    """Read and print file content"""
+    table = pq.read_table(file_path)
+    df = table.to_pandas()
+    print("\nFile content:")
+    
+    for i in range(len(df)):
+        print(f"\nRow {i}:")
+        for column in df.columns:
+            value = df.iloc[i][column]
+            print(f"{column}: {format_value(value)}")
+
+def inspect_pages_with_cli(file_path, parquet_cli_path=None):
+    """
+    Inspect page information using parquet-cli
+    
+    Args:
+        file_path: Path to the parquet file
+        parquet_cli_path: Optional path to parquet-cli jar file
+    """
+    if not parquet_cli_path:
+        print("\nSkipping parquet-cli inspection: No parquet-cli path 
provided")
+        return
+
+    print("\nParquet CLI Output:")
+    try:
+        cmd = f"java -jar {parquet_cli_path} pages {file_path}"
+        result = subprocess.run(cmd, shell=True, check=True, 
capture_output=True, text=True)
+        print(result.stdout)
+    except subprocess.CalledProcessError as e:
+        print(f"Error running parquet-cli: {e}")
+        if e.output:
+            print(f"Error output: {e.output}")
+    except Exception as e:
+        print(f"Unexpected error running parquet-cli: {e}")
+
+def save_test_data_info(output_file):
+    """Save detailed test data information to text file"""
+    info = {
+        "file_format": "Parquet",
+        "version": "2.6",
+        "compression": "None",
+        "row_group_size": 3,
+        "data_page_size": 100,
+        "write_batch_size": 1,
+        "output_file": output_file,
+        "schema": {
+            "id": "INT32",
+            "nested_array_col": "ARRAY<ARRAY<INT32>>",
+            "array_struct_col": "ARRAY<STRUCT<x: INT32, y: STRING>>",
+            "map_array_col": "MAP<STRING, ARRAY<INT32>>",
+            "complex_struct_col": """STRUCT<
+                a: ARRAY<INT32>,
+                b: MAP<STRING, ARRAY<INT32>>,
+                c: STRUCT<
+                    x: ARRAY<INT32>,
+                    y: STRING
+                >
+            >""",
+            "description": "STRING"
+        },
+        "test_cases": [
+            {
+                "row": 1,
+                "description": "Large nested data",
+                "characteristics": [
+                    "Large nested arrays (100 arrays of 10 elements each)",
+                    "Large struct array (500 elements)",
+                    "Large map (50 key-value pairs)",
+                    "Complex nested structure with large arrays"
+                ]
+            },
+            {
+                "row": 2,
+                "description": "Small nested data",
+                "characteristics": [
+                    "Small nested arrays (3 arrays of 2 elements each)",
+                    "Small struct array (2 elements)",
+                    "Small map (1 key-value pair)",
+                    "Complex nested structure with small arrays"
+                ]
+            },
+            {
+                "row": 3,
+                "description": "Another large nested data",
+                "characteristics": [
+                    "Large nested arrays (100 arrays of 10 elements each)",
+                    "Large struct array (500 elements)",
+                    "Large map (50 key-value pairs)",
+                    "Complex nested structure with large arrays"
+                ]
+            }
+        ]
+    }
+    
+    info_file = output_file.replace('.parquet', '_info.json')
+    with open(info_file, 'w') as f:
+        json.dump(info, f, indent=2)
+
+if __name__ == '__main__':
+    # Add command line argument parsing
+    parser = argparse.ArgumentParser(description='Generate and inspect parquet 
test data')
+    parser.add_argument('--parquet-cli', 
+                       help='Path to parquet-cli jar file',
+                       default=None)
+    parser.add_argument('--output',
+                       help='Output parquet file path',
+                       default=OUTPUT_PARQUET_FILE)
+    args = parser.parse_args()
+
+    # Use the output file path from command line or default
+    output_file = args.output
+
+    generate_cross_page_test_data(output_file)
+    inspect_parquet_file(output_file)
+    read_and_print_file(output_file)
+    inspect_pages_with_cli(output_file, args.parquet_cli)
+    save_test_data_info(output_file)
diff --git 
a/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_nested_types/data_gen_scripts/nested_cross_page_test3.py
 
b/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_nested_types/data_gen_scripts/nested_cross_page_test3.py
new file mode 100644
index 00000000000..75ef7c7e755
--- /dev/null
+++ 
b/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_nested_types/data_gen_scripts/nested_cross_page_test3.py
@@ -0,0 +1,196 @@
+import pyarrow as pa
+import pyarrow.parquet as pq
+import subprocess
+import argparse
+import json
+
+# Define the output file path as a constant
+OUTPUT_PARQUET_FILE = 'nested_cross_page_test3.parquet'
+
+def generate_cross_page_test_data(output_file):
+    # Create test data
+    data = {
+        # id column (INT32)
+        'id': [1, None, 3],
+        
+        # array column (ARRAY<INT>)
+        'array_col': [
+            # Row 1 - Large array to force cross-page
+            [1, None, 3, 4, 5] * 200,  # 1000 elements
+            
+            # Row 2 - Null array
+            None,
+            
+            # Row 3 - Another large array with nulls
+            [6, None, 8, None, 10] * 200  # 1000 elements
+        ],
+        
+        # description column (STRING)
+        'description': [
+            'This is a large array with repeated sequence [1,null,3,4,5]',
+            None,
+            'This is another large array with repeated sequence 
[6,null,8,null,10]'
+        ]
+    }
+    
+    # Create table structure
+    table = pa.Table.from_pydict({
+        'id': pa.array(data['id'], type=pa.int32()),
+        'array_col': pa.array(data['array_col'], type=pa.list_(pa.int32())),
+        'description': pa.array(data['description'], type=pa.string())
+    })
+    
+    # Write to parquet file
+    pq.write_table(
+        table,
+        output_file,
+        compression=None,  # No compression for predictable size
+        version='2.6',
+        write_statistics=True,
+        row_group_size=3,        # All data in one row group
+        data_page_size=100,      # Very small page size
+        write_batch_size=10      # Small batch size
+    )
+
+def inspect_parquet_file(file_path):
+    """Inspect the structure of generated parquet file"""
+    pf = pq.ParquetFile(file_path)
+    print(f"\nFile: {file_path}")
+    print(f"Number of row groups: {pf.num_row_groups}")
+    
+    metadata = pf.metadata
+    schema = pf.schema
+    print(f"\nSchema: {schema}")
+    print(f"\nDetailed schema:")
+    for i in range(len(schema)):
+        print(f"Column {i}: {schema[i]}")
+    
+    for i in range(metadata.num_row_groups):
+        rg = metadata.row_group(i)
+        print(f"\nRow Group {i}:")
+        print(f"Num rows: {rg.num_rows}")
+        
+        for j in range(rg.num_columns):
+            col = rg.column(j)
+            print(f"\nColumn {j}:")
+            print(f"Path: {schema[j].name}")
+            print(f"Type: {col.physical_type}")
+            print(f"Encodings: {col.encodings}")
+            print(f"Total compressed size: {col.total_compressed_size}")
+            print(f"Total uncompressed size: {col.total_uncompressed_size}")
+            print(f"Number of values: {col.num_values}")
+            print(f"Data page offset: {col.data_page_offset}")
+            if col.dictionary_page_offset is not None:
+                print(f"Dictionary page offset: {col.dictionary_page_offset}")
+
+def read_and_print_file(file_path):
+    """Read and print file content"""
+    table = pq.read_table(file_path)
+    df = table.to_pandas()
+    print("\nFile content:")
+    for i in range(len(df)):
+        print(f"\nRow {i}:")
+        print(f"ID: {df.iloc[i]['id']}")
+        arr = df.iloc[i]['array_col']
+        if arr is not None:
+            print(f"Array length: {len(arr)}")
+            print(f"First few elements: {arr[:5]}...")
+            print(f"Last few elements: ...{arr[-5:]}")
+        else:
+            print("Array: None")
+        print(f"Description: {df.iloc[i]['description']}")
+
+def inspect_pages_with_cli(file_path, parquet_cli_path=None):
+    """
+    Inspect page information using parquet-cli
+    
+    Args:
+        file_path: Path to the parquet file
+        parquet_cli_path: Optional path to parquet-cli jar file
+    """
+    if not parquet_cli_path:
+        print("\nSkipping parquet-cli inspection: No parquet-cli path 
provided")
+        return
+
+    print("\nParquet CLI Output:")
+    try:
+        cmd = f"java -jar {parquet_cli_path} pages {file_path}"
+        result = subprocess.run(cmd, shell=True, check=True, 
capture_output=True, text=True)
+        print(result.stdout)
+    except subprocess.CalledProcessError as e:
+        print(f"Error running parquet-cli: {e}")
+        if e.output:
+            print(f"Error output: {e.output}")
+    except Exception as e:
+        print(f"Unexpected error running parquet-cli: {e}")
+
+def save_test_data_info(output_file):
+    """Save detailed test data information to text file"""
+    info = {
+        "file_format": "Parquet",
+        "version": "2.6",
+        "compression": "None",
+        "row_group_size": 3,
+        "data_page_size": 100,
+        "write_batch_size": 10,
+        "output_file": output_file,
+        "schema": {
+            "id": "INT32",
+            "array_col": "ARRAY<INT32>",
+            "description": "STRING"
+        },
+        "test_cases": [
+            {
+                "row": 1,
+                "description": "Large array with nulls",
+                "characteristics": [
+                    "1000 elements",
+                    "Repeated sequence [1,null,3,4,5]",
+                    "Forces cross-page scenario"
+                ]
+            },
+            {
+                "row": 2,
+                "description": "Null array and values",
+                "characteristics": [
+                    "Entire array is null",
+                    "ID is null",
+                    "Description is null"
+                ]
+            },
+            {
+                "row": 3,
+                "description": "Another large array with nulls",
+                "characteristics": [
+                    "1000 elements",
+                    "Repeated sequence [6,null,8,null,10]",
+                    "Forces cross-page scenario"
+                ]
+            }
+        ]
+    }
+    
+    info_file = output_file.replace('.parquet', '_info.json')
+    with open(info_file, 'w') as f:
+        json.dump(info, f, indent=2)
+
+if __name__ == '__main__':
+    # Add command line argument parsing
+    parser = argparse.ArgumentParser(description='Generate and inspect parquet 
test data')
+    parser.add_argument('--parquet-cli', 
+                       help='Path to parquet-cli jar file',
+                       default=None)
+    parser.add_argument('--output',
+                       help='Output parquet file path',
+                       default=OUTPUT_PARQUET_FILE)
+    args = parser.parse_args()
+
+    # Use the output file path from command line or default
+    output_file = args.output
+
+    generate_cross_page_test_data(output_file)
+    inspect_parquet_file(output_file)
+    read_and_print_file(output_file)
+    inspect_pages_with_cli(output_file, args.parquet_cli)
+    save_test_data_info(output_file)
+
diff --git 
a/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_nested_types/run.sh
 
b/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_nested_types/run.sh
new file mode 100755
index 00000000000..f3136eaa200
--- /dev/null
+++ 
b/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_nested_types/run.sh
@@ -0,0 +1,12 @@
+#!/bin/bash
+set -x
+
+CUR_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)"
+
+## mkdir and put data to hdfs
+cd "${CUR_DIR}" && rm -rf data/ && tar xzf data.tar.gz
+hadoop fs -mkdir -p /user/doris/suites/multi_catalog/
+hadoop fs -put "${CUR_DIR}"/data/* /user/doris/suites/multi_catalog/
+
+# create table
+hive -f "${CUR_DIR}/create_table.hql"
diff --git 
a/regression-test/data/external_table_p0/hive/test_parquet_nested_types.out 
b/regression-test/data/external_table_p0/hive/test_parquet_nested_types.out
new file mode 100644
index 00000000000..73049b7866f
Binary files /dev/null and 
b/regression-test/data/external_table_p0/hive/test_parquet_nested_types.out 
differ
diff --git 
a/regression-test/suites/external_table_p0/hive/test_parquet_nested_types.groovy
 
b/regression-test/suites/external_table_p0/hive/test_parquet_nested_types.groovy
new file mode 100644
index 00000000000..d94f909142e
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/hive/test_parquet_nested_types.groovy
@@ -0,0 +1,209 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_parquet_nested_types", 
"p0,external,hive,external_docker,external_docker_hive") {
+    String enabled = context.config.otherConfigs.get("enableHiveTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("disable Hive test.")
+        return;
+    }
+
+    for (String hivePrefix : ["hive2", "hive3"]) {
+        String hms_port = context.config.otherConfigs.get(hivePrefix + 
"HmsPort")
+        String catalog_name = "${hivePrefix}_test_parquet_nested_types"
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+
+        sql """drop catalog if exists ${catalog_name}"""
+        sql """create catalog if not exists ${catalog_name} properties (
+            "type"="hms",
+            'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}'
+        );"""
+        logger.info("catalog " + catalog_name + " created")
+        sql """switch ${catalog_name};"""
+        logger.info("switched to catalog " + catalog_name)
+        
+        sql """ use multi_catalog """
+        
+        order_qt_nested_cross_page1_parquet_q1 """select array_col from 
nested_cross_page1_parquet where id = 1"""
+        
+        order_qt_nested_cross_page1_parquet_q2 """select array_col from 
nested_cross_page1_parquet where id = 2"""
+        
+        order_qt_nested_cross_page1_parquet_q3 """select array_col from 
nested_cross_page1_parquet where id = 3"""
+
+        order_qt_nested_cross_page1_parquet_q4 """
+            SELECT id, array_size(array_col) as arr_size
+            FROM nested_cross_page1_parquet
+            ORDER BY id
+        """
+
+        order_qt_nested_cross_page1_parquet_q5 """
+            SELECT id, array_col[1] as first_elem, array_col[3] as third_elem
+            FROM nested_cross_page1_parquet
+            ORDER BY id
+        """
+
+        order_qt_nested_cross_page1_parquet_q6 """
+            SELECT id, array_col
+            FROM nested_cross_page1_parquet
+            WHERE array_size(array_col) > 100
+            ORDER BY id
+        """
+
+        order_qt_nested_cross_page1_parquet_q7 """
+            SELECT id, array_col
+            FROM nested_cross_page1_parquet
+            WHERE array_contains(array_col, 1)
+            ORDER BY id
+        """
+
+        order_qt_nested_cross_page1_parquet_q8 """
+            SELECT id, array_col, description
+            FROM nested_cross_page1_parquet
+            WHERE id > 1 AND array_size(array_col) < 100
+            ORDER BY id
+        """
+
+        order_qt_nested_cross_page1_parquet_q9 """
+            SELECT
+                id,
+                array_min(array_col) as min_val,
+                array_max(array_col) as max_val
+            FROM nested_cross_page1_parquet
+            ORDER BY id
+        """
+
+        order_qt_nested_cross_page1_parquet_q10 """
+            SELECT id, array_col
+            FROM nested_cross_page1_parquet
+            WHERE description LIKE '%large array%'
+            ORDER BY id
+        """
+
+        order_qt_nested_cross_page2_parquet_q1 """
+            SELECT
+            id,
+            nested_array_col,
+            array_size(nested_array_col) as outer_size
+            FROM nested_cross_page2_parquet
+            WHERE id = 1
+        """
+
+        order_qt_nested_cross_page2_parquet_q2 """
+            SELECT
+            id,
+            nested_array_col,
+            array_size(nested_array_col) as outer_size
+            FROM nested_cross_page2_parquet
+            WHERE id = 2
+        """
+
+        order_qt_nested_cross_page2_parquet_q3 """
+            SELECT
+            id,
+            nested_array_col,
+            array_size(nested_array_col) as outer_size
+            FROM nested_cross_page2_parquet
+            WHERE id = 3
+        """
+
+        order_qt_nested_cross_page2_parquet_q4 """
+            SELECT
+                id,
+                array_struct_col,
+                array_size(array_struct_col) as struct_arr_size
+            FROM nested_cross_page2_parquet
+            WHERE description LIKE '%large%'
+        """
+
+        order_qt_nested_cross_page2_parquet_q5 """
+            SELECT
+                id,
+                item_x as x_value,
+                item_y as y_value
+            FROM nested_cross_page2_parquet
+            LATERAL VIEW EXPLODE(array_struct_col) tmp AS item_x, item_y
+            WHERE id = 1 AND item_x > 100
+        """
+
+        order_qt_nested_cross_page2_parquet_q6 """
+            SELECT
+                id,
+                map_array_col,
+                array_size(map_array_col) as map_size
+            FROM nested_cross_page2_parquet
+            WHERE id = 2
+        """
+        
+        order_qt_nested_cross_page3_parquet_q1 """select array_col from 
nested_cross_page3_parquet where id = 1"""
+        
+        order_qt_nested_cross_page3_parquet_q2 """select array_col from 
nested_cross_page3_parquet where id = 2"""
+        
+        order_qt_nested_cross_page3_parquet_q3 """select array_col from 
nested_cross_page3_parquet where id = 3"""
+
+        order_qt_nested_cross_page3_parquet_q4 """
+            SELECT id, array_size(array_col) as arr_size
+            FROM nested_cross_page3_parquet
+            ORDER BY id
+        """
+
+        order_qt_nested_cross_page3_parquet_q5 """
+            SELECT id, array_col[1] as first_elem, array_col[3] as third_elem
+            FROM nested_cross_page3_parquet
+            ORDER BY id
+        """
+
+        order_qt_nested_cross_page3_parquet_q6 """
+            SELECT id, array_col
+            FROM nested_cross_page3_parquet
+            WHERE array_size(array_col) > 100
+            ORDER BY id
+        """
+
+        order_qt_nested_cross_page3_parquet_q7 """
+            SELECT id, array_col
+            FROM nested_cross_page3_parquet
+            WHERE array_contains(array_col, 1)
+            ORDER BY id
+        """
+
+        order_qt_nested_cross_page3_parquet_q8 """
+            SELECT id, array_col, description
+            FROM nested_cross_page3_parquet
+            WHERE id > 1 AND array_size(array_col) < 100
+            ORDER BY id
+        """
+
+        order_qt_nested_cross_page3_parquet_q9 """
+            SELECT
+                id,
+                array_min(array_col) as min_val,
+                array_max(array_col) as max_val
+            FROM nested_cross_page3_parquet
+            ORDER BY id
+        """
+
+        order_qt_nested_cross_page3_parquet_q10 """
+            SELECT id, array_col
+            FROM nested_cross_page3_parquet
+            WHERE description LIKE '%large array%'
+            ORDER BY id
+        """
+
+        sql """drop catalog ${catalog_name};"""
+    }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to