This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b4b126b817 [Feature](parquet-reader) Implements dict filter 
functionality parquet reader. (#17594)
b4b126b817 is described below

commit b4b126b817a78609354a1b1c6de75208d21c3479
Author: Qi Chen <kaka11.c...@gmail.com>
AuthorDate: Thu Mar 16 20:29:27 2023 +0800

    [Feature](parquet-reader) Implements dict filter functionality parquet 
reader. (#17594)
    
    Implements dict filter functionality parquet reader to improve performance.
---
 be/src/service/internal_service.cpp                |   2 +-
 be/src/vec/data_types/data_type.cpp                |   1 +
 .../vec/exec/format/parquet/bool_plain_decoder.cpp |   2 +-
 .../vec/exec/format/parquet/bool_plain_decoder.h   |   2 +-
 .../vec/exec/format/parquet/bool_rle_decoder.cpp   |   2 +-
 be/src/vec/exec/format/parquet/bool_rle_decoder.h  |   2 +-
 .../format/parquet/byte_array_dict_decoder.cpp     |  44 +-
 .../exec/format/parquet/byte_array_dict_decoder.h  |  10 +-
 .../format/parquet/byte_array_plain_decoder.cpp    |   3 +-
 .../exec/format/parquet/byte_array_plain_decoder.h |   2 +-
 be/src/vec/exec/format/parquet/decoder.h           |  26 +-
 .../exec/format/parquet/delta_bit_pack_decoder.h   |   9 +-
 .../format/parquet/fix_length_dict_decoder.hpp     |  59 +-
 .../format/parquet/fix_length_plain_decoder.cpp    |   3 +-
 .../exec/format/parquet/fix_length_plain_decoder.h |   2 +-
 .../parquet/vparquet_column_chunk_reader.cpp       |   9 +-
 .../format/parquet/vparquet_column_chunk_reader.h  |  30 +-
 .../exec/format/parquet/vparquet_column_reader.cpp |  66 ++-
 .../exec/format/parquet/vparquet_column_reader.h   |  33 +-
 .../exec/format/parquet/vparquet_group_reader.cpp  | 641 ++++++++++++++++-----
 .../exec/format/parquet/vparquet_group_reader.h    |  44 +-
 be/src/vec/exec/format/parquet/vparquet_reader.cpp |  33 +-
 be/src/vec/exec/format/parquet/vparquet_reader.h   |  17 +-
 be/src/vec/exec/format/table/iceberg_reader.cpp    |  15 +-
 be/src/vec/exec/format/table/iceberg_reader.h      |   6 +-
 be/src/vec/exec/scan/new_file_scan_node.cpp        |   3 +-
 be/src/vec/exec/scan/vfile_scanner.cpp             |  86 ++-
 be/src/vec/exec/scan/vfile_scanner.h               |  15 +-
 be/src/vec/exec/scan/vscan_node.cpp                |   2 +
 be/src/vec/exec/scan/vscan_node.h                  |   2 +
 be/test/vec/exec/parquet/parquet_reader_test.cpp   |   6 +-
 be/test/vec/exec/parquet/parquet_thrift_test.cpp   |  14 +-
 32 files changed, 957 insertions(+), 234 deletions(-)

diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 407ad5dad0..257509a97c 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -480,7 +480,7 @@ void 
PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c
             break;
         }
         case TFileFormatType::FORMAT_PARQUET: {
-            reader.reset(new vectorized::ParquetReader(params, range, 
&io_ctx));
+            reader.reset(new vectorized::ParquetReader(params, range, &io_ctx, 
nullptr));
             break;
         }
         case TFileFormatType::FORMAT_ORC: {
diff --git a/be/src/vec/data_types/data_type.cpp 
b/be/src/vec/data_types/data_type.cpp
index 6d70850564..aee0630585 100644
--- a/be/src/vec/data_types/data_type.cpp
+++ b/be/src/vec/data_types/data_type.cpp
@@ -56,6 +56,7 @@ void IDataType::update_avg_value_size_hint(const IColumn& 
column, double& avg_va
 
 ColumnPtr IDataType::create_column_const(size_t size, const Field& field) 
const {
     auto column = create_column();
+    column->reserve(1);
     column->insert(field);
     return ColumnConst::create(std::move(column), size);
 }
diff --git a/be/src/vec/exec/format/parquet/bool_plain_decoder.cpp 
b/be/src/vec/exec/format/parquet/bool_plain_decoder.cpp
index 87e6a31828..ff05c559a5 100644
--- a/be/src/vec/exec/format/parquet/bool_plain_decoder.cpp
+++ b/be/src/vec/exec/format/parquet/bool_plain_decoder.cpp
@@ -43,7 +43,7 @@ Status BoolPlainDecoder::skip_values(size_t num_values) {
 }
 
 Status BoolPlainDecoder::decode_values(MutableColumnPtr& doris_column, 
DataTypePtr& data_type,
-                                       ColumnSelectVector& select_vector) {
+                                       ColumnSelectVector& select_vector, bool 
is_dict_filter) {
     auto& column_data = 
static_cast<ColumnVector<UInt8>&>(*doris_column).get_data();
     size_t data_index = column_data.size();
     column_data.resize(data_index + select_vector.num_values() - 
select_vector.num_filtered());
diff --git a/be/src/vec/exec/format/parquet/bool_plain_decoder.h 
b/be/src/vec/exec/format/parquet/bool_plain_decoder.h
index 77ab7f4ccd..08e54ecd60 100644
--- a/be/src/vec/exec/format/parquet/bool_plain_decoder.h
+++ b/be/src/vec/exec/format/parquet/bool_plain_decoder.h
@@ -37,7 +37,7 @@ public:
     }
 
     Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& 
data_type,
-                         ColumnSelectVector& select_vector) override;
+                         ColumnSelectVector& select_vector, bool 
is_dict_filter) override;
 
     Status skip_values(size_t num_values) override;
 
diff --git a/be/src/vec/exec/format/parquet/bool_rle_decoder.cpp 
b/be/src/vec/exec/format/parquet/bool_rle_decoder.cpp
index 563b6c68df..0856687bbf 100644
--- a/be/src/vec/exec/format/parquet/bool_rle_decoder.cpp
+++ b/be/src/vec/exec/format/parquet/bool_rle_decoder.cpp
@@ -46,7 +46,7 @@ Status BoolRLEDecoder::skip_values(size_t num_values) {
 }
 
 Status BoolRLEDecoder::decode_values(MutableColumnPtr& doris_column, 
DataTypePtr& data_type,
-                                     ColumnSelectVector& select_vector) {
+                                     ColumnSelectVector& select_vector, bool 
is_dict_filter) {
     auto& column_data = 
static_cast<ColumnVector<UInt8>&>(*doris_column).get_data();
     size_t data_index = column_data.size();
     column_data.resize(data_index + select_vector.num_values() - 
select_vector.num_filtered());
diff --git a/be/src/vec/exec/format/parquet/bool_rle_decoder.h 
b/be/src/vec/exec/format/parquet/bool_rle_decoder.h
index 0b3ba6e05d..6b0c94825f 100644
--- a/be/src/vec/exec/format/parquet/bool_rle_decoder.h
+++ b/be/src/vec/exec/format/parquet/bool_rle_decoder.h
@@ -30,7 +30,7 @@ public:
     void set_data(Slice* slice) override;
 
     Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& 
data_type,
-                         ColumnSelectVector& select_vector) override;
+                         ColumnSelectVector& select_vector, bool 
is_dict_filter) override;
 
     Status skip_values(size_t num_values) override;
 
diff --git a/be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp 
b/be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp
index e5a86c5061..6962b2a47b 100644
--- a/be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp
+++ b/be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp
@@ -38,6 +38,7 @@ Status 
ByteArrayDictDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t
         total_length += l;
     }
 
+    _dict_value_to_code.reserve(num_values);
     // For insert_many_strings_overflow
     _dict_data.resize(total_length + MAX_STRINGS_OVERFLOW_SIZE);
     _max_value_length = 0;
@@ -48,6 +49,7 @@ Status 
ByteArrayDictDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t
         offset_cursor += 4;
         memcpy(&_dict_data[offset], dict_item_address + offset_cursor, l);
         _dict_items.emplace_back(&_dict_data[offset], l);
+        _dict_value_to_code[StringRef(&_dict_data[offset], l)] = i;
         offset_cursor += l;
         offset += l;
         if (offset_cursor > length) {
@@ -63,19 +65,47 @@ Status 
ByteArrayDictDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t
     return Status::OK();
 }
 
+Status ByteArrayDictDecoder::read_dict_values_to_column(MutableColumnPtr& 
doris_column) {
+    doris_column->insert_many_strings_overflow(&_dict_items[0], 
_dict_items.size(),
+                                               _max_value_length);
+    return Status::OK();
+}
+
+Status ByteArrayDictDecoder::get_dict_codes(const ColumnString* string_column,
+                                            std::vector<int32_t>* dict_codes) {
+    for (int i = 0; i < string_column->size(); ++i) {
+        StringRef dict_value = string_column->get_data_at(i);
+        dict_codes->emplace_back(_dict_value_to_code[dict_value]);
+    }
+    return Status::OK();
+}
+
+MutableColumnPtr ByteArrayDictDecoder::convert_dict_column_to_string_column(
+        const ColumnInt32* dict_column) {
+    auto res = ColumnString::create();
+    std::vector<StringRef> dict_values(dict_column->size());
+    const auto& data = dict_column->get_data();
+    for (size_t i = 0; i < dict_column->size(); ++i) {
+        dict_values[i] = _dict_items[data[i]];
+    }
+    res->insert_many_strings_overflow(&dict_values[0], dict_values.size(), 
_max_value_length);
+    return res;
+}
+
 Status ByteArrayDictDecoder::decode_values(MutableColumnPtr& doris_column, 
DataTypePtr& data_type,
-                                           ColumnSelectVector& select_vector) {
+                                           ColumnSelectVector& select_vector, 
bool is_dict_filter) {
     size_t non_null_size = select_vector.num_values() - 
select_vector.num_nulls();
-    if (doris_column->is_column_dictionary() &&
-        assert_cast<ColumnDictI32&>(*doris_column).dict_size() == 0) {
-        assert_cast<ColumnDictI32&>(*doris_column)
-                .insert_many_dict_data(&_dict_items[0], _dict_items.size());
+    if (doris_column->is_column_dictionary()) {
+        ColumnDictI32& dict_column = 
assert_cast<ColumnDictI32&>(*doris_column);
+        if (dict_column.dict_size() == 0) {
+            dict_column.insert_many_dict_data(&_dict_items[0], 
_dict_items.size());
+        }
     }
     _indexes.resize(non_null_size);
     _index_batch_decoder->GetBatch(&_indexes[0], non_null_size);
 
-    if (doris_column->is_column_dictionary()) {
-        return _decode_dict_values(doris_column, select_vector);
+    if (doris_column->is_column_dictionary() || is_dict_filter) {
+        return _decode_dict_values(doris_column, select_vector, 
is_dict_filter);
     }
 
     TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
diff --git a/be/src/vec/exec/format/parquet/byte_array_dict_decoder.h 
b/be/src/vec/exec/format/parquet/byte_array_dict_decoder.h
index a90226d7c3..8492937e83 100644
--- a/be/src/vec/exec/format/parquet/byte_array_dict_decoder.h
+++ b/be/src/vec/exec/format/parquet/byte_array_dict_decoder.h
@@ -31,10 +31,17 @@ public:
     ~ByteArrayDictDecoder() override = default;
 
     Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& 
data_type,
-                         ColumnSelectVector& select_vector) override;
+                         ColumnSelectVector& select_vector, bool 
is_dict_filter) override;
 
     Status set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t length, size_t 
num_values) override;
 
+    Status read_dict_values_to_column(MutableColumnPtr& doris_column) override;
+
+    Status get_dict_codes(const ColumnString* column_string,
+                          std::vector<int32_t>* dict_codes) override;
+
+    MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32* 
dict_column) override;
+
 protected:
     template <typename DecimalPrimitiveType>
     Status _decode_binary_decimal(MutableColumnPtr& doris_column, DataTypePtr& 
data_type,
@@ -44,6 +51,7 @@ protected:
     std::vector<StringRef> _dict_items;
     std::vector<uint8_t> _dict_data;
     size_t _max_value_length;
+    std::unordered_map<StringRef, int32_t> _dict_value_to_code;
 };
 
 template <typename DecimalPrimitiveType>
diff --git a/be/src/vec/exec/format/parquet/byte_array_plain_decoder.cpp 
b/be/src/vec/exec/format/parquet/byte_array_plain_decoder.cpp
index acc977dbe5..833472665b 100644
--- a/be/src/vec/exec/format/parquet/byte_array_plain_decoder.cpp
+++ b/be/src/vec/exec/format/parquet/byte_array_plain_decoder.cpp
@@ -38,7 +38,8 @@ Status ByteArrayPlainDecoder::skip_values(size_t num_values) {
 }
 
 Status ByteArrayPlainDecoder::decode_values(MutableColumnPtr& doris_column, 
DataTypePtr& data_type,
-                                            ColumnSelectVector& select_vector) 
{
+                                            ColumnSelectVector& select_vector,
+                                            bool is_dict_filter) {
     TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
     switch (logical_type) {
     case TypeIndex::String:
diff --git a/be/src/vec/exec/format/parquet/byte_array_plain_decoder.h 
b/be/src/vec/exec/format/parquet/byte_array_plain_decoder.h
index 84bed0dec3..cd35ceaa6f 100644
--- a/be/src/vec/exec/format/parquet/byte_array_plain_decoder.h
+++ b/be/src/vec/exec/format/parquet/byte_array_plain_decoder.h
@@ -30,7 +30,7 @@ public:
     ~ByteArrayPlainDecoder() override = default;
 
     Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& 
data_type,
-                         ColumnSelectVector& select_vector) override;
+                         ColumnSelectVector& select_vector, bool 
is_dict_filter) override;
 
     Status skip_values(size_t num_values) override;
 
diff --git a/be/src/vec/exec/format/parquet/decoder.h 
b/be/src/vec/exec/format/parquet/decoder.h
index 827e377c2b..eb0d88bd63 100644
--- a/be/src/vec/exec/format/parquet/decoder.h
+++ b/be/src/vec/exec/format/parquet/decoder.h
@@ -76,7 +76,7 @@ public:
 
     // Write the decoded values batch to doris's column
     virtual Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& 
data_type,
-                                 ColumnSelectVector& select_vector) = 0;
+                                 ColumnSelectVector& select_vector, bool 
is_dict_filter) = 0;
 
     virtual Status skip_values(size_t num_values) = 0;
 
@@ -84,6 +84,19 @@ public:
         return Status::NotSupported("set_dict is not supported");
     }
 
+    virtual Status read_dict_values_to_column(MutableColumnPtr& doris_column) {
+        return Status::NotSupported("read_dict_values_to_column is not 
supported");
+    }
+
+    virtual Status get_dict_codes(const ColumnString* column_string,
+                                  std::vector<int32_t>* dict_codes) {
+        return Status::NotSupported("get_dict_codes is not supported");
+    }
+
+    virtual MutableColumnPtr convert_dict_column_to_string_column(const 
ColumnInt32* dict_column) {
+        LOG(FATAL) << "Method convert_dict_column_to_string_column is not 
supported";
+    }
+
 protected:
     int32_t _type_length;
     Slice* _data = nullptr;
@@ -136,11 +149,15 @@ protected:
      * Decode dictionary-coded values into doris_column, ensure that 
doris_column is ColumnDictI32 type,
      * and the coded values must be read into _indexes previously.
      */
-    Status _decode_dict_values(MutableColumnPtr& doris_column, 
ColumnSelectVector& select_vector) {
-        DCHECK(doris_column->is_column_dictionary());
+    Status _decode_dict_values(MutableColumnPtr& doris_column, 
ColumnSelectVector& select_vector,
+                               bool is_dict_filter) {
+        DCHECK(doris_column->is_column_dictionary() || is_dict_filter);
         size_t dict_index = 0;
         ColumnSelectVector::DataReadType read_type;
-        auto& column_data = 
assert_cast<ColumnDictI32&>(*doris_column).get_data();
+        PaddedPODArray<Int32>& column_data =
+                doris_column->is_column_dictionary()
+                        ? assert_cast<ColumnDictI32&>(*doris_column).get_data()
+                        : assert_cast<ColumnInt32&>(*doris_column).get_data();
         while (size_t run_length = select_vector.get_next_run(&read_type)) {
             switch (read_type) {
             case ColumnSelectVector::CONTENT: {
@@ -171,7 +188,6 @@ protected:
         return Status::OK();
     }
 
-protected:
     // For dictionary encoding
     std::unique_ptr<uint8_t[]> _dict = nullptr;
     std::unique_ptr<RleBatchDecoder<uint32_t>> _index_batch_decoder = nullptr;
diff --git a/be/src/vec/exec/format/parquet/delta_bit_pack_decoder.h 
b/be/src/vec/exec/format/parquet/delta_bit_pack_decoder.h
index bdebd4d82f..f51941bb3a 100644
--- a/be/src/vec/exec/format/parquet/delta_bit_pack_decoder.h
+++ b/be/src/vec/exec/format/parquet/delta_bit_pack_decoder.h
@@ -64,7 +64,7 @@ public:
             : DeltaDecoder(new FixLengthPlainDecoder(physical_type)) {}
     ~DeltaBitPackDecoder() override = default;
     Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& 
data_type,
-                         ColumnSelectVector& select_vector) override {
+                         ColumnSelectVector& select_vector, bool 
is_dict_filter) override {
         size_t non_null_size = select_vector.num_values() - 
select_vector.num_nulls();
         // decode values
         _values.resize(non_null_size);
@@ -75,7 +75,8 @@ public:
         _data->size = _values.size() * _type_length;
         // set decoded value with fix plain decoder
         init_values_converter();
-        return _type_converted_decoder->decode_values(doris_column, data_type, 
select_vector);
+        return _type_converted_decoder->decode_values(doris_column, data_type, 
select_vector,
+                                                      is_dict_filter);
     }
 
     Status decode(T* buffer, int num_values, int* out_num_values) {
@@ -155,7 +156,7 @@ public:
     }
 
     Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& 
data_type,
-                         ColumnSelectVector& select_vector) override {
+                         ColumnSelectVector& select_vector, bool 
is_dict_filter) override {
         size_t num_values = select_vector.num_values();
         size_t null_count = select_vector.num_nulls();
         // init read buffer
@@ -222,7 +223,7 @@ public:
     }
 
     Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& 
data_type,
-                         ColumnSelectVector& select_vector) override {
+                         ColumnSelectVector& select_vector, bool 
is_dict_filter) override {
         size_t num_values = select_vector.num_values();
         size_t null_count = select_vector.num_nulls();
         _values.resize(num_values - null_count);
diff --git a/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp 
b/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp
index e581a65d10..64daf9325a 100644
--- a/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp
+++ b/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp
@@ -20,6 +20,7 @@
 #include "vec/columns/column_dictionary.h"
 #include "vec/columns/column_nullable.h"
 #include "vec/data_types/data_type_nullable.h"
+#include "vec/exec/format/parquet/decoder.h"
 
 namespace doris::vectorized {
 
@@ -31,7 +32,7 @@ public:
     ~FixLengthDictDecoder() override = default;
 
     Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& 
data_type,
-                         ColumnSelectVector& select_vector) override {
+                         ColumnSelectVector& select_vector, bool 
is_dict_filter) override {
         size_t non_null_size = select_vector.num_values() - 
select_vector.num_nulls();
         if (doris_column->is_column_dictionary() &&
             assert_cast<ColumnDictI32&>(*doris_column).dict_size() == 0) {
@@ -43,11 +44,22 @@ public:
             assert_cast<ColumnDictI32&>(*doris_column)
                     .insert_many_dict_data(&dict_items[0], dict_items.size());
         }
+        if (doris_column->is_column_dictionary()) {
+            ColumnDictI32& dict_column = 
assert_cast<ColumnDictI32&>(*doris_column);
+            if (dict_column.dict_size() == 0) {
+                std::vector<StringRef> dict_items;
+                dict_items.reserve(_dict_items.size());
+                for (int i = 0; i < _dict_items.size(); ++i) {
+                    dict_items.emplace_back((char*)(&_dict_items[i]), 
_type_length);
+                }
+                dict_column.insert_many_dict_data(&dict_items[0], 
dict_items.size());
+            }
+        }
         _indexes.resize(non_null_size);
         _index_batch_decoder->GetBatch(&_indexes[0], non_null_size);
 
-        if (doris_column->is_column_dictionary()) {
-            return _decode_dict_values(doris_column, select_vector);
+        if (doris_column->is_column_dictionary() || is_dict_filter) {
+            return _decode_dict_values(doris_column, select_vector, 
is_dict_filter);
         }
 
         TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
@@ -364,7 +376,7 @@ public:
     ~FixLengthDictDecoder() override = default;
 
     Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& 
data_type,
-                         ColumnSelectVector& select_vector) override {
+                         ColumnSelectVector& select_vector, bool 
is_dict_filter) override {
         size_t non_null_size = select_vector.num_values() - 
select_vector.num_nulls();
         if (doris_column->is_column_dictionary() &&
             assert_cast<ColumnDictI32&>(*doris_column).dict_size() == 0) {
@@ -379,8 +391,8 @@ public:
         _indexes.resize(non_null_size);
         _index_batch_decoder->GetBatch(&_indexes[0], non_null_size);
 
-        if (doris_column->is_column_dictionary()) {
-            return _decode_dict_values(doris_column, select_vector);
+        if (doris_column->is_column_dictionary() || is_dict_filter) {
+            return _decode_dict_values(doris_column, select_vector, 
is_dict_filter);
         }
 
         TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
@@ -434,13 +446,47 @@ public:
         _dict = std::move(dict);
         char* dict_item_address = reinterpret_cast<char*>(_dict.get());
         _dict_items.resize(num_values);
+        _dict_value_to_code.reserve(num_values);
         for (size_t i = 0; i < num_values; ++i) {
             _dict_items[i] = dict_item_address;
+            _dict_value_to_code[StringRef(_dict_items[i], _type_length)] = i;
             dict_item_address += _type_length;
         }
         return Status::OK();
     }
 
+    Status read_dict_values_to_column(MutableColumnPtr& doris_column) override 
{
+        size_t dict_items_size = _dict_items.size();
+        std::vector<StringRef> dict_values(dict_items_size);
+        for (size_t i = 0; i < dict_items_size; ++i) {
+            dict_values.emplace_back(_dict_items[i], _type_length);
+        }
+        doris_column->insert_many_strings(&dict_values[0], dict_items_size);
+        return Status::OK();
+    }
+
+    Status get_dict_codes(const ColumnString* string_column,
+                          std::vector<int32_t>* dict_codes) override {
+        size_t size = string_column->size();
+        dict_codes->reserve(size);
+        for (int i = 0; i < size; ++i) {
+            StringRef dict_value = string_column->get_data_at(i);
+            dict_codes->emplace_back(_dict_value_to_code[dict_value]);
+        }
+        return Status::OK();
+    }
+
+    MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32* 
dict_column) override {
+        auto res = ColumnString::create();
+        std::vector<StringRef> dict_values(dict_column->size());
+        const auto& data = dict_column->get_data();
+        for (size_t i = 0; i < dict_column->size(); ++i) {
+            dict_values.emplace_back(_dict_items[data[i]], _type_length);
+        }
+        res->insert_many_strings(&dict_values[0], dict_values.size());
+        return res;
+    }
+
 protected:
     template <typename DecimalPrimitiveType>
     Status _decode_binary_decimal(MutableColumnPtr& doris_column, DataTypePtr& 
data_type,
@@ -528,6 +574,7 @@ protected:
 
     // For dictionary encoding
     std::vector<char*> _dict_items;
+    std::unordered_map<StringRef, int32_t> _dict_value_to_code;
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/fix_length_plain_decoder.cpp 
b/be/src/vec/exec/format/parquet/fix_length_plain_decoder.cpp
index e50fcc627b..01bfe67981 100644
--- a/be/src/vec/exec/format/parquet/fix_length_plain_decoder.cpp
+++ b/be/src/vec/exec/format/parquet/fix_length_plain_decoder.cpp
@@ -32,7 +32,8 @@ Status FixLengthPlainDecoder::skip_values(size_t num_values) {
 }
 
 Status FixLengthPlainDecoder::decode_values(MutableColumnPtr& doris_column, 
DataTypePtr& data_type,
-                                            ColumnSelectVector& select_vector) 
{
+                                            ColumnSelectVector& select_vector,
+                                            bool is_dict_filter) {
     size_t non_null_size = select_vector.num_values() - 
select_vector.num_nulls();
     if (UNLIKELY(_offset + _type_length * non_null_size > _data->size)) {
         return Status::IOError("Out-of-bounds access in parquet data decoder");
diff --git a/be/src/vec/exec/format/parquet/fix_length_plain_decoder.h 
b/be/src/vec/exec/format/parquet/fix_length_plain_decoder.h
index b8f516444e..c35a97fc3e 100644
--- a/be/src/vec/exec/format/parquet/fix_length_plain_decoder.h
+++ b/be/src/vec/exec/format/parquet/fix_length_plain_decoder.h
@@ -33,7 +33,7 @@ public:
     ~FixLengthPlainDecoder() override = default;
 
     Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& 
data_type,
-                         ColumnSelectVector& select_vector) override;
+                         ColumnSelectVector& select_vector, bool 
is_dict_filter) override;
 
     Status skip_values(size_t num_values) override;
 
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
index d53023c1a2..9308624246 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
@@ -52,6 +52,9 @@ Status ColumnChunkReader::init() {
 }
 
 Status ColumnChunkReader::next_page() {
+    if (_state == HEADER_PARSED) {
+        return Status::OK();
+    }
     if (UNLIKELY(_state == NOT_INIT)) {
         return Status::Corruption("Should initialize chunk reader");
     }
@@ -258,16 +261,16 @@ size_t ColumnChunkReader::get_def_levels(level_t* levels, 
size_t n) {
 }
 
 Status ColumnChunkReader::decode_values(MutableColumnPtr& doris_column, 
DataTypePtr& data_type,
-                                        ColumnSelectVector& select_vector) {
+                                        ColumnSelectVector& select_vector, 
bool is_dict_filter) {
     SCOPED_RAW_TIMER(&_statistics.decode_value_time);
-    if (UNLIKELY(doris_column->is_column_dictionary() && !_has_dict)) {
+    if (UNLIKELY((doris_column->is_column_dictionary() || is_dict_filter) && 
!_has_dict)) {
         return Status::IOError("Not dictionary coded");
     }
     if (UNLIKELY(_remaining_num_values < select_vector.num_values())) {
         return Status::IOError("Decode too many values in current page");
     }
     _remaining_num_values -= select_vector.num_values();
-    return _page_decoder->decode_values(doris_column, data_type, 
select_vector);
+    return _page_decoder->decode_values(doris_column, data_type, 
select_vector, is_dict_filter);
 }
 
 int32_t ColumnChunkReader::_get_type_length() {
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
index 2bf21fbf7b..303af52104 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
@@ -86,14 +86,13 @@ public:
 
     // Skip current page(will not read and parse) if the page is filtered by 
predicates.
     Status skip_page() {
+        Status res = Status::OK();
         _remaining_num_values = 0;
         if (_state == HEADER_PARSED) {
-            return _page_reader->skip_page();
+            res = _page_reader->skip_page();
         }
-        if (_state != DATA_LOADED) {
-            return Status::Corruption("Should parse page header to skip page");
-        }
-        return Status::OK();
+        _state = PAGE_SKIPPED;
+        return res;
     }
     // Skip some values(will not read and parse) in current page if the values 
are filtered by predicates.
     // when skip_data = false, the underlying decoder will not skip data,
@@ -124,7 +123,7 @@ public:
 
     // Decode values in current page into doris column.
     Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& 
data_type,
-                         ColumnSelectVector& select_vector);
+                         ColumnSelectVector& select_vector, bool 
is_dict_filter);
 
     // Get the repetition level decoder of current page.
     LevelDecoder& rep_level_decoder() { return _rep_level_decoder; }
@@ -134,6 +133,8 @@ public:
     level_t max_rep_level() const { return _max_rep_level; }
     level_t max_def_level() const { return _max_def_level; }
 
+    bool has_dict() const { return _has_dict; };
+
     // Get page decoder
     Decoder* get_page_decoder() { return _page_decoder; }
 
@@ -142,8 +143,23 @@ public:
         return _statistics;
     }
 
+    Status read_dict_values_to_column(MutableColumnPtr& doris_column) {
+        return _decoders[static_cast<int>(tparquet::Encoding::RLE_DICTIONARY)]
+                ->read_dict_values_to_column(doris_column);
+    }
+
+    Status get_dict_codes(const ColumnString* column_string, 
std::vector<int32_t>* dict_codes) {
+        return 
_decoders[static_cast<int>(tparquet::Encoding::RLE_DICTIONARY)]->get_dict_codes(
+                column_string, dict_codes);
+    }
+
+    MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32* 
dict_column) {
+        return _decoders[static_cast<int>(tparquet::Encoding::RLE_DICTIONARY)]
+                ->convert_dict_column_to_string_column(dict_column);
+    }
+
 private:
-    enum ColumnChunkReaderState { NOT_INIT, INITIALIZED, HEADER_PARSED, 
DATA_LOADED };
+    enum ColumnChunkReaderState { NOT_INIT, INITIALIZED, HEADER_PARSED, 
DATA_LOADED, PAGE_SKIPPED };
 
     Status _decode_dict_page();
     void _reserve_decompress_buf(size_t size);
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index 3e94a3082f..0357c77b22 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -214,7 +214,8 @@ Status ScalarColumnReader::_skip_values(size_t num_values) {
 }
 
 Status ScalarColumnReader::_read_values(size_t num_values, ColumnPtr& 
doris_column,
-                                        DataTypePtr& type, ColumnSelectVector& 
select_vector) {
+                                        DataTypePtr& type, ColumnSelectVector& 
select_vector,
+                                        bool is_dict_filter) {
     if (num_values == 0) {
         return Status::OK();
     }
@@ -271,12 +272,12 @@ Status ScalarColumnReader::_read_values(size_t 
num_values, ColumnPtr& doris_colu
         SCOPED_RAW_TIMER(&_decode_null_map_time);
         select_vector.set_run_length_null_map(null_map, num_values, 
map_data_column);
     }
-    return _chunk_reader->decode_values(data_column, type, select_vector);
+    return _chunk_reader->decode_values(data_column, type, select_vector, 
is_dict_filter);
 }
 
 Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, 
DataTypePtr& type,
                                                ColumnSelectVector& 
select_vector, size_t batch_size,
-                                               size_t* read_rows, bool* eof) {
+                                               size_t* read_rows, bool* eof, 
bool is_dict_filter) {
     _rep_levels.resize(0);
     _def_levels.resize(0);
     size_t parsed_rows = 0;
@@ -373,7 +374,7 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& 
doris_column, DataType
         SCOPED_RAW_TIMER(&_decode_null_map_time);
         select_vector.set_run_length_null_map(null_map, num_values, 
map_data_column);
     }
-    RETURN_IF_ERROR(_chunk_reader->decode_values(data_column, type, 
select_vector));
+    RETURN_IF_ERROR(_chunk_reader->decode_values(data_column, type, 
select_vector, is_dict_filter));
     if (ancestor_nulls != 0) {
         _chunk_reader->skip_values(ancestor_nulls, false);
     }
@@ -384,10 +385,44 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& 
doris_column, DataType
     }
     return Status::OK();
 }
+Status ScalarColumnReader::read_dict_values_to_column(MutableColumnPtr& 
doris_column,
+                                                      bool* has_dict) {
+    bool loaded;
+    RETURN_IF_ERROR(_try_load_dict_page(&loaded, has_dict));
+    if (loaded && has_dict) {
+        return _chunk_reader->read_dict_values_to_column(doris_column);
+    }
+    return Status::OK();
+}
+
+Status ScalarColumnReader::get_dict_codes(const ColumnString* column_string,
+                                          std::vector<int32_t>* dict_codes) {
+    return _chunk_reader->get_dict_codes(column_string, dict_codes);
+}
+
+MutableColumnPtr ScalarColumnReader::convert_dict_column_to_string_column(
+        const ColumnInt32* dict_column) {
+    return _chunk_reader->convert_dict_column_to_string_column(dict_column);
+}
+
+Status ScalarColumnReader::_try_load_dict_page(bool* loaded, bool* has_dict) {
+    *loaded = false;
+    *has_dict = false;
+    if (_chunk_reader->remaining_num_values() == 0) {
+        if (!_chunk_reader->has_next_page()) {
+            *loaded = false;
+            return Status::OK();
+        }
+        RETURN_IF_ERROR(_chunk_reader->next_page());
+        *loaded = true;
+        *has_dict = _chunk_reader->has_dict();
+    }
+    return Status::OK();
+}
 
 Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, 
DataTypePtr& type,
                                             ColumnSelectVector& select_vector, 
size_t batch_size,
-                                            size_t* read_rows, bool* eof) {
+                                            size_t* read_rows, bool* eof, bool 
is_dict_filter) {
     if (_chunk_reader->remaining_num_values() == 0) {
         if (!_chunk_reader->has_next_page()) {
             *eof = true;
@@ -398,7 +433,8 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& 
doris_column, DataTypePtr
     }
     if (_nested_column) {
         RETURN_IF_ERROR(_chunk_reader->load_page_data_idempotent());
-        return _read_nested_column(doris_column, type, select_vector, 
batch_size, read_rows, eof);
+        return _read_nested_column(doris_column, type, select_vector, 
batch_size, read_rows, eof,
+                                   is_dict_filter);
     }
 
     // generate the row ranges that should be read
@@ -452,7 +488,8 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& 
doris_column, DataTypePtr
             if (skip_whole_batch) {
                 RETURN_IF_ERROR(_skip_values(read_values));
             } else {
-                RETURN_IF_ERROR(_read_values(read_values, doris_column, type, 
select_vector));
+                RETURN_IF_ERROR(_read_values(read_values, doris_column, type, 
select_vector,
+                                             is_dict_filter));
             }
             has_read += read_values;
             _current_row_index += read_values;
@@ -478,7 +515,7 @@ Status 
ArrayColumnReader::init(std::unique_ptr<ParquetColumnReader> element_read
 
 Status ArrayColumnReader::read_column_data(ColumnPtr& doris_column, 
DataTypePtr& type,
                                            ColumnSelectVector& select_vector, 
size_t batch_size,
-                                           size_t* read_rows, bool* eof) {
+                                           size_t* read_rows, bool* eof, bool 
is_dict_filter) {
     MutableColumnPtr data_column;
     NullMap* null_map_ptr = nullptr;
     if (doris_column->is_nullable()) {
@@ -499,7 +536,7 @@ Status ArrayColumnReader::read_column_data(ColumnPtr& 
doris_column, DataTypePtr&
             const_cast<DataTypePtr&>(
                     (reinterpret_cast<const 
DataTypeArray*>(remove_nullable(type).get()))
                             ->get_nested_type()),
-            select_vector, batch_size, read_rows, eof));
+            select_vector, batch_size, read_rows, eof, is_dict_filter));
     if (*read_rows == 0) {
         return Status::OK();
     }
@@ -523,7 +560,7 @@ Status 
MapColumnReader::init(std::unique_ptr<ParquetColumnReader> key_reader,
 
 Status MapColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& 
type,
                                          ColumnSelectVector& select_vector, 
size_t batch_size,
-                                         size_t* read_rows, bool* eof) {
+                                         size_t* read_rows, bool* eof, bool 
is_dict_filter) {
     MutableColumnPtr data_column;
     NullMap* null_map_ptr = nullptr;
     if (doris_column->is_nullable()) {
@@ -553,10 +590,11 @@ Status MapColumnReader::read_column_data(ColumnPtr& 
doris_column, DataTypePtr& t
     bool key_eof = false;
     bool value_eof = false;
     RETURN_IF_ERROR(_key_reader->read_column_data(key_column, key_type, 
select_vector, batch_size,
-                                                  &key_rows, &key_eof));
+                                                  &key_rows, &key_eof, 
is_dict_filter));
     select_vector.reset();
     RETURN_IF_ERROR(_value_reader->read_column_data(value_column, value_type, 
select_vector,
-                                                    batch_size, &value_rows, 
&value_eof));
+                                                    batch_size, &value_rows, 
&value_eof,
+                                                    is_dict_filter));
     DCHECK_EQ(key_rows, value_rows);
     DCHECK_EQ(key_eof, value_eof);
     *read_rows = key_rows;
@@ -581,7 +619,7 @@ Status 
StructColumnReader::init(std::vector<std::unique_ptr<ParquetColumnReader>
 }
 Status StructColumnReader::read_column_data(ColumnPtr& doris_column, 
DataTypePtr& type,
                                             ColumnSelectVector& select_vector, 
size_t batch_size,
-                                            size_t* read_rows, bool* eof) {
+                                            size_t* read_rows, bool* eof, bool 
is_dict_filter) {
     MutableColumnPtr data_column;
     NullMap* null_map_ptr = nullptr;
     if (doris_column->is_nullable()) {
@@ -609,7 +647,7 @@ Status StructColumnReader::read_column_data(ColumnPtr& 
doris_column, DataTypePtr
         size_t loop_rows = 0;
         bool loop_eof = false;
         _child_readers[i]->read_column_data(doris_field, doris_type, 
select_vector, batch_size,
-                                            &loop_rows, &loop_eof);
+                                            &loop_rows, &loop_eof, 
is_dict_filter);
         if (i != 0) {
             DCHECK_EQ(*read_rows, loop_rows);
             DCHECK_EQ(*eof, loop_eof);
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
index 292eb06fba..7f9764ec06 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
@@ -82,7 +82,21 @@ public:
     virtual ~ParquetColumnReader() = default;
     virtual Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
                                     ColumnSelectVector& select_vector, size_t 
batch_size,
-                                    size_t* read_rows, bool* eof) = 0;
+                                    size_t* read_rows, bool* eof, bool 
is_dict_filter) = 0;
+
+    virtual Status read_dict_values_to_column(MutableColumnPtr& doris_column, 
bool* has_dict) {
+        return Status::NotSupported("read_dict_values_to_column is not 
supported");
+    }
+
+    virtual Status get_dict_codes(const ColumnString* column_string,
+                                  std::vector<int32_t>* dict_codes) {
+        return Status::NotSupported("get_dict_codes is not supported");
+    }
+
+    virtual MutableColumnPtr convert_dict_column_to_string_column(const 
ColumnInt32* dict_column) {
+        LOG(FATAL) << "Method convert_dict_column_to_string_column is not 
supported";
+    }
+
     static Status create(io::FileReaderSPtr file, FieldSchema* field,
                          const tparquet::RowGroup& row_group,
                          const std::vector<RowRange>& row_ranges, 
cctz::time_zone* ctz,
@@ -118,7 +132,11 @@ public:
     Status init(io::FileReaderSPtr file, FieldSchema* field, size_t 
max_buf_size);
     Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
                             ColumnSelectVector& select_vector, size_t 
batch_size, size_t* read_rows,
-                            bool* eof) override;
+                            bool* eof, bool is_dict_filter) override;
+    Status read_dict_values_to_column(MutableColumnPtr& doris_column, bool* 
has_dict) override;
+    Status get_dict_codes(const ColumnString* column_string,
+                          std::vector<int32_t>* dict_codes) override;
+    MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32* 
dict_column) override;
     const std::vector<level_t>& get_rep_level() const override { return 
_rep_levels; }
     const std::vector<level_t>& get_def_level() const override { return 
_def_levels; }
     Statistics statistics() override {
@@ -136,10 +154,11 @@ private:
 
     Status _skip_values(size_t num_values);
     Status _read_values(size_t num_values, ColumnPtr& doris_column, 
DataTypePtr& type,
-                        ColumnSelectVector& select_vector);
+                        ColumnSelectVector& select_vector, bool 
is_dict_filter);
     Status _read_nested_column(ColumnPtr& doris_column, DataTypePtr& type,
                                ColumnSelectVector& select_vector, size_t 
batch_size,
-                               size_t* read_rows, bool* eof);
+                               size_t* read_rows, bool* eof, bool 
is_dict_filter);
+    Status _try_load_dict_page(bool* loaded, bool* has_dict);
 };
 
 class ArrayColumnReader : public ParquetColumnReader {
@@ -150,7 +169,7 @@ public:
     Status init(std::unique_ptr<ParquetColumnReader> element_reader, 
FieldSchema* field);
     Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
                             ColumnSelectVector& select_vector, size_t 
batch_size, size_t* read_rows,
-                            bool* eof) override;
+                            bool* eof, bool is_dict_filter) override;
     const std::vector<level_t>& get_rep_level() const override {
         return _element_reader->get_rep_level();
     }
@@ -174,7 +193,7 @@ public:
                 std::unique_ptr<ParquetColumnReader> value_reader, 
FieldSchema* field);
     Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
                             ColumnSelectVector& select_vector, size_t 
batch_size, size_t* read_rows,
-                            bool* eof) override;
+                            bool* eof, bool is_dict_filter) override;
 
     const std::vector<level_t>& get_rep_level() const override {
         return _key_reader->get_rep_level();
@@ -207,7 +226,7 @@ public:
                 FieldSchema* field);
     Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
                             ColumnSelectVector& select_vector, size_t 
batch_size, size_t* read_rows,
-                            bool* eof) override;
+                            bool* eof, bool is_dict_filter) override;
 
     const std::vector<level_t>& get_rep_level() const override {
         return _child_readers[0]->get_rep_level();
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 71f77f3735..b48cd722b6 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -17,9 +17,14 @@
 
 #include "vparquet_group_reader.h"
 
+#include "exprs/create_predicate_function.h"
 #include "schema_desc.h"
 #include "util/simd/bits.h"
 #include "vec/columns/column_const.h"
+#include "vec/exprs/vdirect_in_predicate.h"
+#include "vec/exprs/vectorized_fn_call.h"
+#include "vec/exprs/vliteral.h"
+#include "vec/exprs/vslot_ref.h"
 #include "vparquet_column_reader.h"
 
 namespace doris::vectorized {
@@ -31,7 +36,7 @@ RowGroupReader::RowGroupReader(io::FileReaderSPtr file_reader,
                                const int32_t row_group_id, const 
tparquet::RowGroup& row_group,
                                cctz::time_zone* ctz,
                                const PositionDeleteContext& 
position_delete_ctx,
-                               const LazyReadContext& lazy_read_ctx)
+                               const LazyReadContext& lazy_read_ctx, 
RuntimeState* state)
         : _file_reader(file_reader),
           _read_columns(read_columns),
           _row_group_id(row_group_id),
@@ -39,14 +44,35 @@ RowGroupReader::RowGroupReader(io::FileReaderSPtr 
file_reader,
           _remaining_rows(row_group.num_rows),
           _ctz(ctz),
           _position_delete_ctx(position_delete_ctx),
-          _lazy_read_ctx(lazy_read_ctx) {}
+          _lazy_read_ctx(lazy_read_ctx),
+          _state(state),
+          _obj_pool(new ObjectPool()) {}
 
 RowGroupReader::~RowGroupReader() {
     _column_readers.clear();
+    for (auto* ctx : _dict_filter_conjuncts) {
+        if (ctx) {
+            ctx->close(_state);
+        }
+    }
+    _obj_pool->clear();
 }
 
-Status RowGroupReader::init(const FieldDescriptor& schema, 
std::vector<RowRange>& row_ranges,
-                            std::unordered_map<int, tparquet::OffsetIndex>& 
col_offsets) {
+Status RowGroupReader::init(
+        const FieldDescriptor& schema, std::vector<RowRange>& row_ranges,
+        std::unordered_map<int, tparquet::OffsetIndex>& col_offsets,
+        const TupleDescriptor* tuple_descriptor, const RowDescriptor* 
row_descriptor,
+        const std::unordered_map<std::string, int>* colname_to_slot_id,
+        const std::vector<VExprContext*>* not_single_slot_filter_conjuncts,
+        const std::unordered_map<int, std::vector<VExprContext*>>* 
slot_id_to_filter_conjuncts) {
+    _tuple_descriptor = tuple_descriptor;
+    _row_descriptor = row_descriptor;
+    _col_name_to_slot_id = colname_to_slot_id;
+    _slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts;
+    if (not_single_slot_filter_conjuncts) {
+        _filter_conjuncts.insert(_filter_conjuncts.end(), 
not_single_slot_filter_conjuncts->begin(),
+                                 not_single_slot_filter_conjuncts->end());
+    }
     _merge_read_ranges(row_ranges);
     if (_read_columns.empty()) {
         // Query task that only select columns in path.
@@ -71,11 +97,137 @@ Status RowGroupReader::init(const FieldDescriptor& schema, 
std::vector<RowRange>
         }
         _column_readers[read_col._file_slot_name] = std::move(reader);
     }
+    // Check if single slot can be filtered by dict.
+    if (!_slot_id_to_filter_conjuncts) {
+        return Status::OK();
+    }
+    for (auto& predicate_col_name : _lazy_read_ctx.predicate_columns) {
+        auto field = 
const_cast<FieldSchema*>(schema.get_column(predicate_col_name));
+        if (_can_filter_by_dict(predicate_col_name,
+                                
_row_group_meta.columns[field->physical_column_index].meta_data)) {
+            _dict_filter_col_names.emplace_back(predicate_col_name);
+        } else {
+            int slot_id = _col_name_to_slot_id->at(predicate_col_name);
+            if (_slot_id_to_filter_conjuncts->find(slot_id) !=
+                _slot_id_to_filter_conjuncts->end()) {
+                for (VExprContext* ctx : 
_slot_id_to_filter_conjuncts->at(slot_id)) {
+                    _filter_conjuncts.push_back(ctx);
+                }
+            }
+        }
+    }
+    RETURN_IF_ERROR(_rewrite_dict_predicates());
     return Status::OK();
 }
 
+bool RowGroupReader::_can_filter_by_dict(const string& predicate_col_name,
+                                         const tparquet::ColumnMetaData& 
column_metadata) {
+    SlotDescriptor* slot = nullptr;
+    const std::vector<SlotDescriptor*>& slots = _tuple_descriptor->slots();
+    int slot_id = _col_name_to_slot_id->at(predicate_col_name);
+    for (auto each : slots) {
+        if (each->id() == slot_id) {
+            slot = each;
+            break;
+        }
+    }
+    if (!slot->type().is_string_type()) {
+        return false;
+    }
+
+    if (_slot_id_to_filter_conjuncts->find(slot_id) == 
_slot_id_to_filter_conjuncts->end()) {
+        return false;
+    }
+
+    if (!is_dictionary_encoded(column_metadata)) {
+        return false;
+    }
+
+    // TODO:check expr like 'a > 10 is null', 'a > 10' should can be filter by 
dict.
+    for (VExprContext* ctx : _slot_id_to_filter_conjuncts->at(slot_id)) {
+        const VExpr* root_expr = ctx->root();
+        if (root_expr->node_type() == TExprNodeType::FUNCTION_CALL) {
+            std::string is_null_str;
+            std::string function_name = root_expr->fn().name.function_name;
+            if (function_name.compare("is_null_pred") == 0 ||
+                function_name.compare("is_not_null_pred") == 0) {
+                return false;
+            }
+        }
+    }
+
+    return true;
+}
+// This function is copied from
+// 
https://github.com/apache/impala/blob/master/be/src/exec/parquet/hdfs-parquet-scanner.cc#L1717
+bool RowGroupReader::is_dictionary_encoded(const tparquet::ColumnMetaData& 
column_metadata) {
+    // The Parquet spec allows for column chunks to have mixed encodings
+    // where some data pages are dictionary-encoded and others are plain
+    // encoded. For example, a Parquet file writer might start writing
+    // a column chunk as dictionary encoded, but it will switch to plain
+    // encoding if the dictionary grows too large.
+    //
+    // In order for dictionary filters to skip the entire row group,
+    // the conjuncts must be evaluated on column chunks that are entirely
+    // encoded with the dictionary encoding. There are two checks
+    // available to verify this:
+    // 1. The encoding_stats field on the column chunk metadata provides
+    //    information about the number of data pages written in each
+    //    format. This allows for a specific check of whether all the
+    //    data pages are dictionary encoded.
+    // 2. The encodings field on the column chunk metadata lists the
+    //    encodings used. If this list contains the dictionary encoding
+    //    and does not include unexpected encodings (i.e. encodings not
+    //    associated with definition/repetition levels), then it is entirely
+    //    dictionary encoded.
+    if (column_metadata.__isset.encoding_stats) {
+        // Condition #1 above
+        for (const tparquet::PageEncodingStats& enc_stat : 
column_metadata.encoding_stats) {
+            if (enc_stat.page_type == tparquet::PageType::DATA_PAGE &&
+                (enc_stat.encoding != tparquet::Encoding::PLAIN_DICTIONARY &&
+                 enc_stat.encoding != tparquet::Encoding::RLE_DICTIONARY) &&
+                enc_stat.count > 0) {
+                return false;
+            }
+        }
+    } else {
+        // Condition #2 above
+        bool has_dict_encoding = false;
+        bool has_nondict_encoding = false;
+        for (const tparquet::Encoding::type& encoding : 
column_metadata.encodings) {
+            if (encoding == tparquet::Encoding::PLAIN_DICTIONARY ||
+                encoding == tparquet::Encoding::RLE_DICTIONARY) {
+                has_dict_encoding = true;
+            }
+
+            // RLE and BIT_PACKED are used for repetition/definition levels
+            if (encoding != tparquet::Encoding::PLAIN_DICTIONARY &&
+                encoding != tparquet::Encoding::RLE_DICTIONARY &&
+                encoding != tparquet::Encoding::RLE && encoding != 
tparquet::Encoding::BIT_PACKED) {
+                has_nondict_encoding = true;
+                break;
+            }
+        }
+        // Not entirely dictionary encoded if:
+        // 1. No dictionary encoding listed
+        // OR
+        // 2. Some non-dictionary encoding is listed
+        if (!has_dict_encoding || has_nondict_encoding) {
+            return false;
+        }
+    }
+
+    return true;
+}
+
 Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* 
read_rows,
                                   bool* batch_eof) {
+    if (_is_row_group_filtered) {
+        *read_rows = 0;
+        *batch_eof = true;
+        return Status::OK();
+    }
+
     // Process external table query task that select columns are all from path.
     if (_read_columns.empty()) {
         RETURN_IF_ERROR(_read_empty_batch(batch_size, read_rows, batch_eof));
@@ -113,10 +265,13 @@ Status RowGroupReader::next_batch(Block* block, size_t 
batch_size, size_t* read_
             columns_to_filter[i] = i;
         }
         if (_lazy_read_ctx.vconjunct_ctx != nullptr) {
-            int result_column_id = -1;
-            RETURN_IF_ERROR(_lazy_read_ctx.vconjunct_ctx->execute(block, 
&result_column_id));
-            ColumnPtr& filter_column = 
block->get_by_position(result_column_id).column;
-            RETURN_IF_ERROR(_filter_block(block, filter_column, 
column_to_keep, columns_to_filter));
+            std::vector<IColumn::Filter*> filters;
+            if (_position_delete_ctx.has_filter) {
+                filters.push_back(_pos_delete_filter_ptr.get());
+            }
+            
RETURN_IF_ERROR(_execute_conjuncts_and_filter_block(_filter_conjuncts, filters, 
block,
+                                                                
columns_to_filter, column_to_keep));
+            _convert_dict_cols_to_string_cols(block);
         } else {
             RETURN_IF_ERROR(_filter_block(block, column_to_keep, 
columns_to_filter));
         }
@@ -139,6 +294,25 @@ Status RowGroupReader::_read_column_data(Block* block, 
const std::vector<std::st
         auto& column_with_type_and_name = block->get_by_name(read_col);
         auto& column_ptr = column_with_type_and_name.column;
         auto& column_type = column_with_type_and_name.type;
+        auto col_iter =
+                std::find(_dict_filter_col_names.begin(), 
_dict_filter_col_names.end(), read_col);
+        bool is_dict_filter = false;
+        if (col_iter != _dict_filter_col_names.end()) {
+            MutableColumnPtr dict_column = ColumnVector<Int32>::create();
+            size_t pos = block->get_position_by_name(read_col);
+            if (column_type->is_nullable()) {
+                block->get_by_position(pos).type =
+                        
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
+                block->replace_by_position(
+                        pos, ColumnNullable::create(std::move(dict_column),
+                                                    
ColumnUInt8::create(dict_column->size(), 0)));
+            } else {
+                block->get_by_position(pos).type = 
std::make_shared<DataTypeInt32>();
+                block->replace_by_position(pos, std::move(dict_column));
+            }
+            is_dict_filter = true;
+        }
+
         size_t col_read_rows = 0;
         bool col_eof = false;
         // Should reset _filter_map_index to 0 when reading next column.
@@ -147,7 +321,7 @@ Status RowGroupReader::_read_column_data(Block* block, 
const std::vector<std::st
             size_t loop_rows = 0;
             RETURN_IF_ERROR(_column_readers[read_col]->read_column_data(
                     column_ptr, column_type, select_vector, batch_size - 
col_read_rows, &loop_rows,
-                    &col_eof));
+                    &col_eof, is_dict_filter));
             col_read_rows += loop_rows;
         }
         if (batch_read_rows > 0 && batch_read_rows != col_read_rows) {
@@ -169,7 +343,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t 
batch_size, size_t* re
     size_t pre_read_rows;
     bool pre_eof;
     size_t origin_column_num = block->columns();
-    int filter_column_id = -1;
+    IColumn::Filter result_filter;
     while (true) {
         // read predicate columns
         pre_read_rows = 0;
@@ -194,16 +368,21 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t 
batch_size, size_t* re
             // The following process may be tricky and time-consuming, but we 
have no other way.
             
block->get_by_position(0).column->assume_mutable()->resize(pre_read_rows);
         }
-        RETURN_IF_ERROR(_lazy_read_ctx.vconjunct_ctx->execute(block, 
&filter_column_id));
-        ColumnPtr& sv = block->get_by_position(filter_column_id).column;
+        result_filter.assign(pre_read_rows, static_cast<unsigned char>(1));
+        bool can_filter_all = false;
+        std::vector<IColumn::Filter*> filters;
+        if (_position_delete_ctx.has_filter) {
+            filters.push_back(_pos_delete_filter_ptr.get());
+        }
+        RETURN_IF_ERROR(_execute_conjuncts(_filter_conjuncts, filters, block, 
&result_filter,
+                                           &can_filter_all));
+
         if (_lazy_read_ctx.resize_first_column) {
             // We have to clean the first column to insert right data.
             block->get_by_position(0).column->assume_mutable()->clear();
         }
 
-        // build filter map
-        bool can_filter_all = false;
-        const uint8_t* filter_map = _build_filter_map(sv, pre_read_rows, 
&can_filter_all);
+        const uint8_t* __restrict filter_map = result_filter.data();
         select_vector_ptr.reset(new ColumnSelectVector(filter_map, 
pre_read_rows, can_filter_all));
         if (select_vector_ptr->filter_all() && !pre_eof) {
             // If continuous batches are skipped, we can cache them to skip a 
whole page
@@ -256,14 +435,16 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t 
batch_size, size_t* re
             // generated from next batch, so the filter column is removed 
ahead.
             DCHECK_EQ(block->rows(), 0);
         } else {
-            const auto& filter_column = 
block->get_by_position(filter_column_id).column;
-            RETURN_IF_ERROR(_filter_block(block, filter_column, 
origin_column_num,
-                                          
_lazy_read_ctx.all_predicate_col_ids));
+            RETURN_IF_ERROR(_filter_block_internal(block, 
_lazy_read_ctx.all_predicate_col_ids,
+                                                   result_filter));
+            Block::erase_useless_column(block, origin_column_num);
         }
     } else {
         Block::erase_useless_column(block, origin_column_num);
     }
 
+    _convert_dict_cols_to_string_cols(block);
+
     size_t column_num = block->columns();
     size_t column_size = 0;
     for (int i = 0; i < column_num; ++i) {
@@ -283,55 +464,9 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t 
batch_size, size_t* re
     return _fill_missing_columns(block, column_size, 
_lazy_read_ctx.missing_columns);
 }
 
-const uint8_t* RowGroupReader::_build_filter_map(ColumnPtr& sv, size_t 
num_rows,
-                                                 bool* can_filter_all) {
-    const uint8_t* filter_map = nullptr;
-    if (auto* nullable_column = check_and_get_column<ColumnNullable>(*sv)) {
-        size_t column_size = nullable_column->size();
-        if (column_size == 0) {
-            *can_filter_all = true;
-        } else {
-            DCHECK_EQ(column_size, num_rows);
-            const auto* __restrict null_map_data = 
nullable_column->get_null_map_data().data();
-            ColumnUInt8* concrete_column = typeid_cast<ColumnUInt8*>(
-                    
nullable_column->get_nested_column_ptr()->assume_mutable().get());
-            auto* __restrict filter_data = concrete_column->get_data().data();
-            if (_position_delete_ctx.has_filter) {
-                auto* __restrict pos_delete_filter_data = 
_pos_delete_filter_ptr->data();
-                for (size_t i = 0; i < num_rows; ++i) {
-                    filter_data[i] &= (!null_map_data[i]) & 
pos_delete_filter_data[i];
-                }
-            } else {
-                for (size_t i = 0; i < num_rows; ++i) {
-                    filter_data[i] &= (!null_map_data[i]);
-                }
-            }
-            filter_map = filter_data;
-        }
-    } else if (auto* const_column = check_and_get_column<ColumnConst>(*sv)) {
-        // filter all
-        *can_filter_all = !const_column->get_bool(0);
-    } else {
-        MutableColumnPtr mutable_holder = sv->assume_mutable();
-        ColumnUInt8* mutable_filter_column = 
typeid_cast<ColumnUInt8*>(mutable_holder.get());
-        IColumn::Filter& filter = mutable_filter_column->get_data();
-        auto* __restrict filter_data = filter.data();
-        const size_t size = filter.size();
-
-        if (_position_delete_ctx.has_filter) {
-            auto* __restrict pos_delete_filter_data = 
_pos_delete_filter_ptr->data();
-            for (size_t i = 0; i < size; ++i) {
-                filter_data[i] &= pos_delete_filter_data[i];
-            }
-        }
-        filter_map = filter_data;
-    }
-    return filter_map;
-}
-
 void RowGroupReader::_rebuild_select_vector(ColumnSelectVector& select_vector,
                                             std::unique_ptr<uint8_t[]>& 
filter_map,
-                                            size_t pre_read_rows) {
+                                            size_t pre_read_rows) const {
     if (_cached_filtered_rows == 0) {
         return;
     }
@@ -493,73 +628,6 @@ Status RowGroupReader::_build_pos_delete_filter(size_t 
read_rows) {
     return Status::OK();
 }
 
-Status RowGroupReader::_filter_block(Block* block, const ColumnPtr& 
filter_column,
-                                     int column_to_keep, std::vector<uint32_t> 
columns_to_filter) {
-    if (auto* nullable_column = 
check_and_get_column<ColumnNullable>(*filter_column)) {
-        const auto& nested_column = nullable_column->get_nested_column_ptr();
-
-        MutableColumnPtr mutable_holder =
-                nested_column->use_count() == 1
-                        ? nested_column->assume_mutable()
-                        : nested_column->clone_resized(nested_column->size());
-
-        ColumnUInt8* concrete_column = 
typeid_cast<ColumnUInt8*>(mutable_holder.get());
-        if (!concrete_column) {
-            return Status::InvalidArgument(
-                    "Illegal type {} of column for filter. Must be UInt8 or 
Nullable(UInt8).",
-                    filter_column->get_name());
-        }
-        auto* __restrict null_map_data = 
nullable_column->get_null_map_data().data();
-        IColumn::Filter& filter = concrete_column->get_data();
-        auto* __restrict filter_data = filter.data();
-        const size_t size = filter.size();
-
-        if (_position_delete_ctx.has_filter) {
-            auto* __restrict pos_delete_filter_data = 
_pos_delete_filter_ptr->data();
-            for (size_t i = 0; i < size; ++i) {
-                filter_data[i] &= (!null_map_data[i]) & 
pos_delete_filter_data[i];
-            }
-        } else {
-            for (size_t i = 0; i < size; ++i) {
-                filter_data[i] &= (!null_map_data[i]);
-            }
-        }
-        RETURN_IF_ERROR(_filter_block_internal(block, columns_to_filter, 
filter));
-    } else if (auto* const_column = 
check_and_get_column<ColumnConst>(*filter_column)) {
-        bool ret = const_column->get_bool(0);
-        if (!ret) {
-            for (auto& col : columns_to_filter) {
-                
std::move(*block->get_by_position(col).column).assume_mutable()->clear();
-            }
-        }
-    } else {
-        MutableColumnPtr mutable_holder =
-                filter_column->use_count() == 1
-                        ? filter_column->assume_mutable()
-                        : filter_column->clone_resized(filter_column->size());
-        ColumnUInt8* mutable_filter_column = 
typeid_cast<ColumnUInt8*>(mutable_holder.get());
-        if (!mutable_filter_column) {
-            return Status::InvalidArgument(
-                    "Illegal type {} of column for filter. Must be UInt8 or 
Nullable(UInt8).",
-                    filter_column->get_name());
-        }
-
-        IColumn::Filter& filter = mutable_filter_column->get_data();
-        auto* __restrict filter_data = filter.data();
-
-        if (_position_delete_ctx.has_filter) {
-            auto* __restrict pos_delete_filter_data = 
_pos_delete_filter_ptr->data();
-            const size_t size = filter.size();
-            for (size_t i = 0; i < size; ++i) {
-                filter_data[i] &= pos_delete_filter_data[i];
-            }
-        }
-        RETURN_IF_ERROR(_filter_block_internal(block, columns_to_filter, 
filter));
-    }
-    Block::erase_useless_column(block, column_to_keep);
-    return Status::OK();
-}
-
 Status RowGroupReader::_filter_block(Block* block, int column_to_keep,
                                      const std::vector<uint32_t>& 
columns_to_filter) {
     if (_pos_delete_filter_ptr) {
@@ -599,6 +667,233 @@ Status RowGroupReader::_filter_block_internal(Block* 
block,
     return Status::OK();
 }
 
+Status RowGroupReader::_rewrite_dict_predicates() {
+    for (vector<std::string>::iterator it = _dict_filter_col_names.begin();
+         it != _dict_filter_col_names.end();) {
+        std::string& dict_filter_col_name = *it;
+        int slot_id = _col_name_to_slot_id->at(dict_filter_col_name);
+        // 1. Get dictionary values to a string column.
+        MutableColumnPtr dict_value_column = ColumnString::create();
+        bool has_dict = false;
+        
RETURN_IF_ERROR(_column_readers[dict_filter_col_name]->read_dict_values_to_column(
+                dict_value_column, &has_dict));
+        size_t dict_value_column_size = dict_value_column->size();
+        DCHECK(has_dict);
+        // 2. Build a temp block from the dict string column, then execute 
conjuncts and filter block.
+        // 2.1 Build a temp block from the dict string column to match the 
conjuncts executing.
+        Block temp_block;
+        int dict_pos = -1;
+        int index = 0;
+        for (const auto slot_desc : _tuple_descriptor->slots()) {
+            if (!slot_desc->need_materialize()) {
+                // should be ignored from reading
+                continue;
+            }
+            if (slot_desc->id() == slot_id) {
+                auto data_type = slot_desc->get_data_type_ptr();
+                if (data_type->is_nullable()) {
+                    temp_block.insert(
+                            
{ColumnNullable::create(std::move(dict_value_column),
+                                                    
ColumnUInt8::create(dict_value_column_size, 0)),
+                             
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()),
+                             ""});
+                } else {
+                    temp_block.insert(
+                            {std::move(dict_value_column), 
std::make_shared<DataTypeString>(), ""});
+                }
+                dict_pos = index;
+
+            } else {
+                
temp_block.insert(ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
+                                                        
slot_desc->get_data_type_ptr(),
+                                                        
slot_desc->col_name()));
+            }
+            ++index;
+        }
+
+        // 2.2 Execute conjuncts and filter block.
+        const std::vector<VExprContext*>* ctxs = nullptr;
+        auto iter = _slot_id_to_filter_conjuncts->find(slot_id);
+        if (iter != _slot_id_to_filter_conjuncts->end()) {
+            ctxs = &(iter->second);
+        } else {
+            std::stringstream msg;
+            msg << "_slot_id_to_filter_conjuncts: slot_id [" << slot_id << "] 
not found";
+            return Status::NotFound(msg.str());
+        }
+
+        std::vector<uint32_t> columns_to_filter(1, dict_pos);
+        int column_to_keep = temp_block.columns();
+        if (dict_pos != 0) {
+            // VExprContext.execute has an optimization, the filtering is 
executed when block->rows() > 0
+            // The following process may be tricky and time-consuming, but we 
have no other way.
+            
temp_block.get_by_position(0).column->assume_mutable()->resize(dict_value_column_size);
+        }
+        std::vector<IColumn::Filter*> filters;
+        RETURN_IF_ERROR(_execute_conjuncts_and_filter_block(*ctxs, filters, 
&temp_block,
+                                                            columns_to_filter, 
column_to_keep));
+        if (dict_pos != 0) {
+            // We have to clean the first column to insert right data.
+            temp_block.get_by_position(0).column->assume_mutable()->clear();
+        }
+
+        // Check some conditions.
+        ColumnPtr& dict_column = temp_block.get_by_position(dict_pos).column;
+        // If dict_column->size() == 0, can filter this row group.
+        if (dict_column->size() == 0) {
+            _is_row_group_filtered = true;
+            return Status::OK();
+        }
+
+        // About Performance: if dict_column size is too large, it will 
generate a large IN filter.
+        if (dict_column->size() > MAX_DICT_CODE_PREDICATE_TO_REWRITE) {
+            for (auto& ctx : (*ctxs)) {
+                _filter_conjuncts.push_back(ctx);
+            }
+            it = _dict_filter_col_names.erase(it);
+            continue;
+        }
+
+        // 3. Get dict codes.
+        std::vector<int32_t> dict_codes;
+        if (dict_column->is_nullable()) {
+            const ColumnNullable* nullable_column =
+                    static_cast<const ColumnNullable*>(dict_column.get());
+            const ColumnString* nested_column = static_cast<const 
ColumnString*>(
+                    nullable_column->get_nested_column_ptr().get());
+            
RETURN_IF_ERROR(_column_readers[dict_filter_col_name]->get_dict_codes(
+                    assert_cast<const ColumnString*>(nested_column), 
&dict_codes));
+        } else {
+            
RETURN_IF_ERROR(_column_readers[dict_filter_col_name]->get_dict_codes(
+                    assert_cast<const ColumnString*>(dict_column.get()), 
&dict_codes));
+        }
+
+        // 4. Rewrite conjuncts.
+        _rewrite_dict_conjuncts(dict_codes, slot_id);
+        ++it;
+    }
+    return Status::OK();
+}
+
+Status RowGroupReader::_rewrite_dict_conjuncts(std::vector<int32_t>& 
dict_codes, int slot_id) {
+    VExpr* root;
+    if (dict_codes.size() == 1) {
+        {
+            TFunction fn;
+            TFunctionName fn_name;
+            fn_name.__set_db_name("");
+            fn_name.__set_function_name("eq");
+            fn.__set_name(fn_name);
+            fn.__set_binary_type(TFunctionBinaryType::BUILTIN);
+            std::vector<TTypeDesc> arg_types;
+            arg_types.push_back(create_type_desc(PrimitiveType::TYPE_INT));
+            arg_types.push_back(create_type_desc(PrimitiveType::TYPE_INT));
+            fn.__set_arg_types(arg_types);
+            fn.__set_ret_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
+            fn.__set_has_var_args(false);
+
+            TExprNode texpr_node;
+            
texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
+            texpr_node.__set_node_type(TExprNodeType::BINARY_PRED);
+            texpr_node.__set_opcode(TExprOpcode::EQ);
+            texpr_node.__set_vector_opcode(TExprOpcode::EQ);
+            texpr_node.__set_fn(fn);
+            texpr_node.__set_child_type(TPrimitiveType::INT);
+            texpr_node.__set_num_children(2);
+            root = _obj_pool->add(new VectorizedFnCall(texpr_node));
+        }
+        {
+            SlotDescriptor* slot = nullptr;
+            const std::vector<SlotDescriptor*>& slots = 
_tuple_descriptor->slots();
+            for (auto each : slots) {
+                if (each->id() == slot_id) {
+                    slot = each;
+                    break;
+                }
+            }
+            VExpr* slot_ref_expr = _obj_pool->add(new VSlotRef(slot));
+            root->add_child(slot_ref_expr);
+        }
+        {
+            TExprNode texpr_node;
+            texpr_node.__set_node_type(TExprNodeType::INT_LITERAL);
+            texpr_node.__set_type(create_type_desc(TYPE_INT));
+            TIntLiteral int_literal;
+            int_literal.__set_value(dict_codes[0]);
+            texpr_node.__set_int_literal(int_literal);
+            VExpr* literal_expr = _obj_pool->add(new VLiteral(texpr_node));
+            root->add_child(literal_expr);
+        }
+    } else {
+        {
+            TTypeDesc type_desc = 
create_type_desc(PrimitiveType::TYPE_BOOLEAN);
+            TExprNode node;
+            node.__set_type(type_desc);
+            node.__set_node_type(TExprNodeType::IN_PRED);
+            node.in_predicate.__set_is_not_in(false);
+            node.__set_opcode(TExprOpcode::FILTER_IN);
+            node.__isset.vector_opcode = true;
+            node.__set_vector_opcode(TExprOpcode::FILTER_IN);
+
+            root = _obj_pool->add(new vectorized::VDirectInPredicate(node));
+            std::shared_ptr<HybridSetBase> 
hybrid_set(create_set(PrimitiveType::TYPE_INT));
+            for (int j = 0; j < dict_codes.size(); ++j) {
+                hybrid_set->insert(&dict_codes[j]);
+            }
+            
static_cast<vectorized::VDirectInPredicate*>(root)->set_filter(hybrid_set);
+        }
+        {
+            SlotDescriptor* slot = nullptr;
+            const std::vector<SlotDescriptor*>& slots = 
_tuple_descriptor->slots();
+            for (auto each : slots) {
+                if (each->id() == slot_id) {
+                    slot = each;
+                    break;
+                }
+            }
+            VExpr* slot_ref_expr = _obj_pool->add(new VSlotRef(slot));
+            root->add_child(slot_ref_expr);
+        }
+    }
+    VExprContext* rewritten_conjunct_ctx = _obj_pool->add(new 
VExprContext(root));
+    RETURN_IF_ERROR(rewritten_conjunct_ctx->prepare(_state, *_row_descriptor));
+    RETURN_IF_ERROR(rewritten_conjunct_ctx->open(_state));
+    _dict_filter_conjuncts.push_back(rewritten_conjunct_ctx);
+    _filter_conjuncts.push_back(rewritten_conjunct_ctx);
+    return Status::OK();
+}
+
+void RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) {
+    for (auto& dict_filter_col_name : _dict_filter_col_names) {
+        size_t pos = block->get_position_by_name(dict_filter_col_name);
+        ColumnWithTypeAndName& column_with_type_and_name = 
block->get_by_position(pos);
+        const ColumnPtr& column = column_with_type_and_name.column;
+        if (auto* nullable_column = 
check_and_get_column<ColumnNullable>(*column)) {
+            const ColumnPtr& nested_column = 
nullable_column->get_nested_column_ptr();
+            const ColumnInt32* dict_column = assert_cast<const 
ColumnInt32*>(nested_column.get());
+            DCHECK(dict_column);
+
+            MutableColumnPtr string_column =
+                    
_column_readers[dict_filter_col_name]->convert_dict_column_to_string_column(
+                            dict_column);
+
+            column_with_type_and_name.type =
+                    
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+            block->replace_by_position(
+                    pos, ColumnNullable::create(std::move(string_column),
+                                                
nullable_column->get_null_map_column_ptr()));
+        } else {
+            const ColumnInt32* dict_column = assert_cast<const 
ColumnInt32*>(column.get());
+            MutableColumnPtr string_column =
+                    
_column_readers[dict_filter_col_name]->convert_dict_column_to_string_column(
+                            dict_column);
+
+            column_with_type_and_name.type = 
std::make_shared<DataTypeString>();
+            block->replace_by_position(pos, std::move(string_column));
+        }
+    }
+}
+
 ParquetColumnReader::Statistics RowGroupReader::statistics() {
     ParquetColumnReader::Statistics st;
     for (auto& reader : _column_readers) {
@@ -608,4 +903,86 @@ ParquetColumnReader::Statistics 
RowGroupReader::statistics() {
     return st;
 }
 
+// TODO Performance Optimization
+Status RowGroupReader::_execute_conjuncts(const std::vector<VExprContext*>& 
ctxs,
+                                          const std::vector<IColumn::Filter*>& 
filters,
+                                          Block* block, IColumn::Filter* 
result_filter,
+                                          bool* can_filter_all) {
+    *can_filter_all = false;
+    auto* __restrict result_filter_data = result_filter->data();
+    for (auto* ctx : ctxs) {
+        int result_column_id = -1;
+        RETURN_IF_ERROR(ctx->execute(block, &result_column_id));
+        ColumnPtr& filter_column = 
block->get_by_position(result_column_id).column;
+        if (auto* nullable_column = 
check_and_get_column<ColumnNullable>(*filter_column)) {
+            size_t column_size = nullable_column->size();
+            if (column_size == 0) {
+                *can_filter_all = true;
+                return Status::OK();
+            } else {
+                const ColumnPtr& nested_column = 
nullable_column->get_nested_column_ptr();
+                const IColumn::Filter& filter =
+                        assert_cast<const 
ColumnUInt8&>(*nested_column).get_data();
+                auto* __restrict filter_data = filter.data();
+                const size_t size = filter.size();
+                auto* __restrict null_map_data = 
nullable_column->get_null_map_data().data();
+
+                for (size_t i = 0; i < size; ++i) {
+                    result_filter_data[i] &= (!null_map_data[i]) & 
filter_data[i];
+                }
+                if (memchr(filter_data, 0x1, size) == nullptr) {
+                    *can_filter_all = true;
+                    return Status::OK();
+                }
+            }
+        } else if (auto* const_column = 
check_and_get_column<ColumnConst>(*filter_column)) {
+            // filter all
+            if (!const_column->get_bool(0)) {
+                *can_filter_all = true;
+                return Status::OK();
+            }
+        } else {
+            const IColumn::Filter& filter =
+                    assert_cast<const ColumnUInt8&>(*filter_column).get_data();
+            auto* __restrict filter_data = filter.data();
+
+            const size_t size = filter.size();
+            for (size_t i = 0; i < size; ++i) {
+                result_filter_data[i] &= filter_data[i];
+            }
+
+            if (memchr(filter_data, 0x1, size) == nullptr) {
+                *can_filter_all = true;
+                return Status::OK();
+            }
+        }
+    }
+    for (auto* filter : filters) {
+        auto* __restrict filter_data = filter->data();
+        const size_t size = filter->size();
+        for (size_t i = 0; i < size; ++i) {
+            result_filter_data[i] &= filter_data[i];
+        }
+    }
+    return Status::OK();
+}
+
+// TODO Performance Optimization
+Status RowGroupReader::_execute_conjuncts_and_filter_block(
+        const std::vector<VExprContext*>& ctxs, const 
std::vector<IColumn::Filter*>& filters,
+        Block* block, std::vector<uint32_t>& columns_to_filter, int 
column_to_keep) {
+    IColumn::Filter result_filter(block->rows(), 1);
+    bool can_filter_all;
+    RETURN_IF_ERROR(_execute_conjuncts(ctxs, filters, block, &result_filter, 
&can_filter_all));
+    if (can_filter_all) {
+        for (auto& col : columns_to_filter) {
+            
std::move(*block->get_by_position(col).column).assume_mutable()->clear();
+        }
+    } else {
+        RETURN_IF_ERROR(_filter_block_internal(block, columns_to_filter, 
result_filter));
+    }
+    Block::erase_useless_column(block, column_to_keep);
+    return Status::OK();
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index cd1f92a909..99cb6cb85a 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -25,6 +25,9 @@
 
 namespace doris::vectorized {
 
+// TODO: we need to determine it by test.
+static constexpr uint32_t MAX_DICT_CODE_PREDICATE_TO_REWRITE = 
std::numeric_limits<uint32_t>::max();
+
 class RowGroupReader {
 public:
     static const std::vector<int64_t> NO_DELETE;
@@ -103,11 +106,16 @@ public:
                    const std::vector<ParquetReadColumn>& read_columns, const 
int32_t row_group_id,
                    const tparquet::RowGroup& row_group, cctz::time_zone* ctz,
                    const PositionDeleteContext& position_delete_ctx,
-                   const LazyReadContext& lazy_read_ctx);
+                   const LazyReadContext& lazy_read_ctx, RuntimeState* state);
 
     ~RowGroupReader();
-    Status init(const FieldDescriptor& schema, std::vector<RowRange>& 
row_ranges,
-                std::unordered_map<int, tparquet::OffsetIndex>& col_offsets);
+    Status init(
+            const FieldDescriptor& schema, std::vector<RowRange>& row_ranges,
+            std::unordered_map<int, tparquet::OffsetIndex>& col_offsets,
+            const TupleDescriptor* tuple_descriptor, const RowDescriptor* 
row_descriptor,
+            const std::unordered_map<std::string, int>* colname_to_slot_id,
+            const std::vector<VExprContext*>* not_single_slot_filter_conjuncts,
+            const std::unordered_map<int, std::vector<VExprContext*>>* 
slot_id_to_filter_conjuncts);
     Status next_batch(Block* block, size_t batch_size, size_t* read_rows, 
bool* batch_eof);
     int64_t lazy_read_filtered_rows() const { return _lazy_read_filtered_rows; 
}
 
@@ -120,9 +128,8 @@ private:
                              size_t batch_size, size_t* read_rows, bool* 
batch_eof,
                              ColumnSelectVector& select_vector);
     Status _do_lazy_read(Block* block, size_t batch_size, size_t* read_rows, 
bool* batch_eof);
-    const uint8_t* _build_filter_map(ColumnPtr& sv, size_t num_rows, bool* 
can_filter_all);
     void _rebuild_select_vector(ColumnSelectVector& select_vector,
-                                std::unique_ptr<uint8_t[]>& filter_map, size_t 
pre_read_rows);
+                                std::unique_ptr<uint8_t[]>& filter_map, size_t 
pre_read_rows) const;
     Status _fill_partition_columns(
             Block* block, size_t rows,
             const std::unordered_map<std::string, std::tuple<std::string, 
const SlotDescriptor*>>&
@@ -131,13 +138,26 @@ private:
             Block* block, size_t rows,
             const std::unordered_map<std::string, VExprContext*>& 
missing_columns);
     Status _build_pos_delete_filter(size_t read_rows);
-    Status _filter_block(Block* block, const ColumnPtr& filter_column, int 
column_to_keep,
-                         std::vector<uint32_t> columns_to_filter);
     Status _filter_block(Block* block, int column_to_keep,
                          const vector<uint32_t>& columns_to_filter);
     Status _filter_block_internal(Block* block, const vector<uint32_t>& 
columns_to_filter,
                                   const IColumn::Filter& filter);
 
+    bool _can_filter_by_dict(const string& predicate_col_name,
+                             const tparquet::ColumnMetaData& column_metadata);
+    bool is_dictionary_encoded(const tparquet::ColumnMetaData& 
column_metadata);
+    Status _rewrite_dict_predicates();
+    Status _rewrite_dict_conjuncts(std::vector<int32_t>& dict_codes, int 
slot_id);
+    void _convert_dict_cols_to_string_cols(Block* block);
+    Status _execute_conjuncts(const std::vector<VExprContext*>& ctxs,
+                              const std::vector<IColumn::Filter*>& filters, 
Block* block,
+                              IColumn::Filter* result_filter, bool* 
can_filter_all);
+    Status _execute_conjuncts_and_filter_block(const 
std::vector<VExprContext*>& ctxs,
+                                               const 
std::vector<IColumn::Filter*>& filters,
+                                               Block* block,
+                                               std::vector<uint32_t>& 
columns_to_filter,
+                                               int column_to_keep);
+
     io::FileReaderSPtr _file_reader;
     std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> 
_column_readers;
     const std::vector<ParquetReadColumn>& _read_columns;
@@ -156,5 +176,15 @@ private:
     std::unique_ptr<TextConverter> _text_converter = nullptr;
     std::unique_ptr<IColumn::Filter> _pos_delete_filter_ptr = nullptr;
     int64_t _total_read_rows = 0;
+    const TupleDescriptor* _tuple_descriptor;
+    const RowDescriptor* _row_descriptor;
+    const std::unordered_map<std::string, int>* _col_name_to_slot_id;
+    const std::unordered_map<int, std::vector<VExprContext*>>* 
_slot_id_to_filter_conjuncts;
+    std::vector<VExprContext*> _dict_filter_conjuncts;
+    std::vector<VExprContext*> _filter_conjuncts;
+    std::vector<std::string> _dict_filter_col_names;
+    RuntimeState* _state;
+    std::shared_ptr<ObjectPool> _obj_pool;
+    bool _is_row_group_filtered = false;
 };
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index c61f60ad89..7c68f1925a 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -34,7 +34,7 @@ namespace doris::vectorized {
 
 ParquetReader::ParquetReader(RuntimeProfile* profile, const 
TFileScanRangeParams& params,
                              const TFileRangeDesc& range, size_t batch_size, 
cctz::time_zone* ctz,
-                             IOContext* io_ctx)
+                             IOContext* io_ctx, RuntimeState* state)
         : _profile(profile),
           _scan_params(params),
           _scan_range(range),
@@ -42,15 +42,20 @@ ParquetReader::ParquetReader(RuntimeProfile* profile, const 
TFileScanRangeParams
           _range_start_offset(range.start_offset),
           _range_size(range.size),
           _ctz(ctz),
-          _io_ctx(io_ctx) {
+          _io_ctx(io_ctx),
+          _state(state) {
     _init_profile();
     _init_system_properties();
     _init_file_description();
 }
 
 ParquetReader::ParquetReader(const TFileScanRangeParams& params, const 
TFileRangeDesc& range,
-                             IOContext* io_ctx)
-        : _profile(nullptr), _scan_params(params), _scan_range(range), 
_io_ctx(io_ctx) {
+                             IOContext* io_ctx, RuntimeState* state)
+        : _profile(nullptr),
+          _scan_params(params),
+          _scan_range(range),
+          _io_ctx(io_ctx),
+          _state(state) {
     _init_system_properties();
     _init_file_description();
 }
@@ -195,7 +200,17 @@ Status ParquetReader::init_reader(
         const std::vector<std::string>& all_column_names,
         const std::vector<std::string>& missing_column_names,
         std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range,
-        VExprContext* vconjunct_ctx, bool filter_groups) {
+        VExprContext* vconjunct_ctx, const TupleDescriptor* tuple_descriptor,
+        const RowDescriptor* row_descriptor,
+        const std::unordered_map<std::string, int>* colname_to_slot_id,
+        const std::vector<VExprContext*>* not_single_slot_filter_conjuncts,
+        const std::unordered_map<int, std::vector<VExprContext*>>* 
slot_id_to_filter_conjuncts,
+        bool filter_groups) {
+    _tuple_descriptor = tuple_descriptor;
+    _row_descriptor = row_descriptor;
+    _colname_to_slot_id = colname_to_slot_id;
+    _not_single_slot_filter_conjuncts = not_single_slot_filter_conjuncts;
+    _slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts;
     if (_file_metadata == nullptr) {
         return Status::InternalError("failed to init parquet reader, please 
open reader first");
     }
@@ -467,10 +482,12 @@ Status ParquetReader::_next_row_group_reader() {
             _get_position_delete_ctx(row_group, row_group_index);
     _current_group_reader.reset(new RowGroupReader(_file_reader, _read_columns,
                                                    
row_group_index.row_group_id, row_group, _ctz,
-                                                   position_delete_ctx, 
_lazy_read_ctx));
+                                                   position_delete_ctx, 
_lazy_read_ctx, _state));
     _row_group_eof = false;
-    return _current_group_reader->init(_file_metadata->schema(), 
candidate_row_ranges,
-                                       _col_offsets);
+    return _current_group_reader->init(_file_metadata->schema(), 
candidate_row_ranges, _col_offsets,
+                                       _tuple_descriptor, _row_descriptor, 
_colname_to_slot_id,
+                                       _not_single_slot_filter_conjuncts,
+                                       _slot_id_to_filter_conjuncts);
 }
 
 Status ParquetReader::_init_row_groups(const bool& is_filter_groups) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index 10e04bdabb..f21996a1df 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -58,10 +58,10 @@ public:
 
     ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
                   const TFileRangeDesc& range, size_t batch_size, 
cctz::time_zone* ctz,
-                  IOContext* io_ctx);
+                  IOContext* io_ctx, RuntimeState* state);
 
     ParquetReader(const TFileScanRangeParams& params, const TFileRangeDesc& 
range,
-                  IOContext* io_ctx);
+                  IOContext* io_ctx, RuntimeState* state);
 
     ~ParquetReader() override;
     // for test
@@ -73,7 +73,12 @@ public:
             const std::vector<std::string>& all_column_names,
             const std::vector<std::string>& missing_column_names,
             std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range,
-            VExprContext* vconjunct_ctx, bool filter_groups = true);
+            VExprContext* vconjunct_ctx, const TupleDescriptor* 
tuple_descriptor,
+            const RowDescriptor* row_descriptor,
+            const std::unordered_map<std::string, int>* colname_to_slot_id,
+            const std::vector<VExprContext*>* not_single_slot_filter_conjuncts,
+            const std::unordered_map<int, std::vector<VExprContext*>>* 
slot_id_to_filter_conjuncts,
+            bool filter_groups = true);
 
     Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
 
@@ -204,5 +209,11 @@ private:
     ParquetProfile _parquet_profile;
     bool _closed = false;
     IOContext* _io_ctx;
+    RuntimeState* _state;
+    const TupleDescriptor* _tuple_descriptor;
+    const RowDescriptor* _row_descriptor;
+    const std::unordered_map<std::string, int>* _colname_to_slot_id;
+    const std::vector<VExprContext*>* _not_single_slot_filter_conjuncts;
+    const std::unordered_map<int, std::vector<VExprContext*>>* 
_slot_id_to_filter_conjuncts;
 };
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp 
b/be/src/vec/exec/format/table/iceberg_reader.cpp
index ce36da6a92..353b3bb198 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -63,7 +63,11 @@ Status IcebergTableReader::init_reader(
         const std::vector<std::string>& file_col_names,
         const std::unordered_map<int, std::string>& col_id_name_map,
         std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range,
-        VExprContext* vconjunct_ctx) {
+        VExprContext* vconjunct_ctx, const TupleDescriptor* tuple_descriptor,
+        const RowDescriptor* row_descriptor,
+        const std::unordered_map<std::string, int>* colname_to_slot_id,
+        const std::vector<VExprContext*>* not_single_slot_filter_conjuncts,
+        const std::unordered_map<int, std::vector<VExprContext*>>* 
slot_id_to_filter_conjuncts) {
     ParquetReader* parquet_reader = 
static_cast<ParquetReader*>(_file_format_reader.get());
     _col_id_name_map = col_id_name_map;
     _file_col_names = file_col_names;
@@ -73,8 +77,10 @@ Status IcebergTableReader::init_reader(
     _gen_file_col_names();
     _gen_new_colname_to_value_range();
     parquet_reader->set_table_to_file_col_map(_table_col_to_file_col);
-    Status status = parquet_reader->init_reader(_all_required_col_names, 
_not_in_file_col_names,
-                                                &_new_colname_to_value_range, 
vconjunct_ctx);
+    Status status = parquet_reader->init_reader(
+            _all_required_col_names, _not_in_file_col_names, 
&_new_colname_to_value_range,
+            vconjunct_ctx, tuple_descriptor, row_descriptor, 
colname_to_slot_id,
+            not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts);
     return status;
 }
 
@@ -181,7 +187,7 @@ Status IcebergTableReader::_position_delete(
             delete_range.file_size = -1;
             ParquetReader delete_reader(_profile, _params, delete_range, 
102400,
                                         
const_cast<cctz::time_zone*>(&_state->timezone_obj()),
-                                        _io_ctx);
+                                        _io_ctx, _state);
             if (!init_schema) {
                 delete_reader.get_parsed_schema(&delete_file_col_names, 
&delete_file_col_types);
                 init_schema = true;
@@ -191,6 +197,7 @@ Status IcebergTableReader::_position_delete(
                 return nullptr;
             }
             create_status = delete_reader.init_reader(delete_file_col_names, 
_not_in_file_col_names,
+                                                      nullptr, nullptr, 
nullptr, nullptr, nullptr,
                                                       nullptr, nullptr, false);
             if (!create_status.ok()) {
                 return nullptr;
diff --git a/be/src/vec/exec/format/table/iceberg_reader.h 
b/be/src/vec/exec/format/table/iceberg_reader.h
index 0f3343e3ca..3e57eda86b 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_reader.h
@@ -63,7 +63,11 @@ public:
             const std::vector<std::string>& file_col_names,
             const std::unordered_map<int, std::string>& col_id_name_map,
             std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range,
-            VExprContext* vconjunct_ctx);
+            VExprContext* vconjunct_ctx, const TupleDescriptor* 
tuple_descriptor,
+            const RowDescriptor* row_descriptor,
+            const std::unordered_map<std::string, int>* colname_to_slot_id,
+            const std::vector<VExprContext*>* not_single_slot_filter_conjuncts,
+            const std::unordered_map<int, std::vector<VExprContext*>>* 
slot_id_to_filter_conjuncts);
 
     enum { DATA, POSITION_DELETE, EQUALITY_DELETE };
 
diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp 
b/be/src/vec/exec/scan/new_file_scan_node.cpp
index 50b54f1691..f6b3c98573 100644
--- a/be/src/vec/exec/scan/new_file_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_file_scan_node.cpp
@@ -93,7 +93,8 @@ Status NewFileScanNode::_init_scanners(std::list<VScanner*>* 
scanners) {
                                              runtime_profile(), _kv_cache);
         _scanner_pool.add(scanner);
         RETURN_IF_ERROR(((VFileScanner*)scanner)
-                                ->prepare(_vconjunct_ctx_ptr.get(), 
&_colname_to_value_range));
+                                ->prepare(_vconjunct_ctx_ptr.get(), 
&_colname_to_value_range,
+                                          &_colname_to_slot_id));
         scanners->push_back(scanner);
     }
 
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index fbc49940bc..aaa0ec9f36 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -36,6 +36,7 @@
 #include "vec/exec/format/parquet/vparquet_reader.h"
 #include "vec/exec/format/table/iceberg_reader.h"
 #include "vec/exec/scan/new_file_scan_node.h"
+#include "vec/exprs/vslot_ref.h"
 #include "vec/functions/simple_function_factory.h"
 
 namespace doris::vectorized {
@@ -59,9 +60,11 @@ VFileScanner::VFileScanner(RuntimeState* state, 
NewFileScanNode* parent, int64_t
 
 Status VFileScanner::prepare(
         VExprContext** vconjunct_ctx_ptr,
-        std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range) {
+        std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range,
+        const std::unordered_map<std::string, int>* colname_to_slot_id) {
     RETURN_IF_ERROR(VScanner::prepare(_state, vconjunct_ctx_ptr));
     _colname_to_value_range = colname_to_value_range;
+    _col_name_to_slot_id = colname_to_slot_id;
 
     _get_block_timer = ADD_TIMER(_parent->_scanner_profile, 
"FileScannerGetBlockTime");
     _cast_to_input_block_timer =
@@ -101,6 +104,59 @@ Status VFileScanner::prepare(
     return Status::OK();
 }
 
+Status VFileScanner::_split_conjuncts(VExpr* conjunct_expr_root) {
+    static constexpr auto is_leaf = [](VExpr* expr) { return 
!expr->is_and_expr(); };
+    if (conjunct_expr_root != nullptr) {
+        if (is_leaf(conjunct_expr_root)) {
+            auto impl = conjunct_expr_root->get_impl();
+            // If impl is not null, which means this a conjuncts from runtime 
filter.
+            VExpr* cur_expr = impl ? const_cast<VExpr*>(impl) : 
conjunct_expr_root;
+            VExprContext* new_ctx = _state->obj_pool()->add(new 
VExprContext(cur_expr));
+            _vconjunct_ctx->clone_fn_contexts(new_ctx);
+            RETURN_IF_ERROR(new_ctx->prepare(_state, *_default_val_row_desc));
+            RETURN_IF_ERROR(new_ctx->open(_state));
+
+            std::vector<int> slot_ids;
+            _get_slot_ids(cur_expr, &slot_ids);
+            if (slot_ids.size() == 0) {
+                _not_single_slot_filter_conjuncts.emplace_back(new_ctx);
+                return Status::OK();
+            }
+            bool single_slot = true;
+            for (int i = 1; i < slot_ids.size(); i++) {
+                if (slot_ids[i] != slot_ids[0]) {
+                    single_slot = false;
+                    break;
+                }
+            }
+            if (single_slot) {
+                SlotId slot_id = slot_ids[0];
+                if (_slot_id_to_filter_conjuncts.find(slot_id) ==
+                    _slot_id_to_filter_conjuncts.end()) {
+                    _slot_id_to_filter_conjuncts.insert({slot_id, 
std::vector<VExprContext*>()});
+                }
+                _slot_id_to_filter_conjuncts[slot_id].emplace_back(new_ctx);
+            } else {
+                _not_single_slot_filter_conjuncts.emplace_back(new_ctx);
+            }
+        } else {
+            
RETURN_IF_ERROR(_split_conjuncts(conjunct_expr_root->children()[0]));
+            
RETURN_IF_ERROR(_split_conjuncts(conjunct_expr_root->children()[1]));
+        }
+    }
+    return Status::OK();
+}
+
+void VFileScanner::_get_slot_ids(VExpr* expr, std::vector<int>* slot_ids) {
+    for (VExpr* child_expr : expr->children()) {
+        if (child_expr->is_slot_ref()) {
+            VSlotRef* slot_ref = reinterpret_cast<VSlotRef*>(child_expr);
+            slot_ids->emplace_back(slot_ref->slot_id());
+        }
+        _get_slot_ids(child_expr, slot_ids);
+    }
+}
+
 Status VFileScanner::open(RuntimeState* state) {
     RETURN_IF_ERROR(VScanner::open(state));
     RETURN_IF_ERROR(_init_expr_ctxes());
@@ -481,7 +537,7 @@ Status VFileScanner::_get_next_reader() {
         case TFileFormatType::FORMAT_PARQUET: {
             ParquetReader* parquet_reader = new ParquetReader(
                     _profile, _params, range, 
_state->query_options().batch_size,
-                    const_cast<cctz::time_zone*>(&_state->timezone_obj()), 
_io_ctx.get());
+                    const_cast<cctz::time_zone*>(&_state->timezone_obj()), 
_io_ctx.get(), _state);
             RETURN_IF_ERROR(parquet_reader->open());
             if (!_is_load && _push_down_expr == nullptr && _vconjunct_ctx != 
nullptr) {
                 RETURN_IF_ERROR(_vconjunct_ctx->clone(_state, 
&_push_down_expr));
@@ -492,14 +548,18 @@ Status VFileScanner::_get_next_reader() {
                 IcebergTableReader* iceberg_reader =
                         new IcebergTableReader((GenericReader*)parquet_reader, 
_profile, _state,
                                                _params, range, _kv_cache, 
_io_ctx.get());
-                init_status = iceberg_reader->init_reader(_file_col_names, 
_col_id_name_map,
-                                                          
_colname_to_value_range, _push_down_expr);
+                init_status = iceberg_reader->init_reader(
+                        _file_col_names, _col_id_name_map, 
_colname_to_value_range, _push_down_expr,
+                        _real_tuple_desc, _default_val_row_desc.get(), 
_col_name_to_slot_id,
+                        &_not_single_slot_filter_conjuncts, 
&_slot_id_to_filter_conjuncts);
                 RETURN_IF_ERROR(iceberg_reader->init_row_filters(range));
                 _cur_reader.reset((GenericReader*)iceberg_reader);
             } else {
                 std::vector<std::string> place_holder;
-                init_status = parquet_reader->init_reader(_file_col_names, 
place_holder,
-                                                          
_colname_to_value_range, _push_down_expr);
+                init_status = parquet_reader->init_reader(
+                        _file_col_names, place_holder, 
_colname_to_value_range, _push_down_expr,
+                        _real_tuple_desc, _default_val_row_desc.get(), 
_col_name_to_slot_id,
+                        &_not_single_slot_filter_conjuncts, 
&_slot_id_to_filter_conjuncts);
                 _cur_reader.reset((GenericReader*)parquet_reader);
             }
             break;
@@ -749,6 +809,20 @@ Status VFileScanner::close(RuntimeState* state) {
         _push_down_expr->close(state);
     }
 
+    for (auto& [k, v] : _slot_id_to_filter_conjuncts) {
+        for (auto& ctx : v) {
+            if (ctx != nullptr) {
+                ctx->close(state);
+            }
+        }
+    }
+
+    for (auto* ctx : _not_single_slot_filter_conjuncts) {
+        if (ctx != nullptr) {
+            ctx->close(state);
+        }
+    }
+
     if (config::enable_file_cache) {
         io::FileCacheProfileReporter cache_profile(_profile);
         cache_profile.update(_file_cache_statistics.get());
diff --git a/be/src/vec/exec/scan/vfile_scanner.h 
b/be/src/vec/exec/scan/vfile_scanner.h
index 4e503da5ef..410d357d18 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -42,7 +42,8 @@ public:
 
 public:
     Status prepare(VExprContext** vconjunct_ctx_ptr,
-                   std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range);
+                   std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range,
+                   const std::unordered_map<std::string, int>* 
colname_to_slot_id);
 
 protected:
     Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) 
override;
@@ -106,6 +107,7 @@ protected:
     int _rows = 0;
     int _num_of_columns_from_file;
 
+    bool _src_block_mem_reuse = false;
     bool _strict_mode;
 
     bool _src_block_init = false;
@@ -114,6 +116,8 @@ protected:
 
     VExprContext* _push_down_expr = nullptr;
     bool _is_dynamic_schema = false;
+    // for tracing dynamic schema
+    std::unique_ptr<vectorized::schema_util::FullBaseSchemaView> 
_full_base_schema_view;
 
     std::unique_ptr<FileCacheStatistics> _file_cache_statistics;
     std::unique_ptr<IOContext> _io_ctx;
@@ -126,6 +130,12 @@ private:
     RuntimeProfile::Counter* _pre_filter_timer = nullptr;
     RuntimeProfile::Counter* _convert_to_output_block_timer = nullptr;
 
+    const std::unordered_map<std::string, int>* _col_name_to_slot_id;
+    // single slot filter conjuncts
+    std::unordered_map<int, std::vector<VExprContext*>> 
_slot_id_to_filter_conjuncts;
+    // not single(zero or multi) slot filter conjuncts
+    std::vector<VExprContext*> _not_single_slot_filter_conjuncts;
+
 private:
     Status _init_expr_ctxes();
     Status _init_src_block(Block* block);
@@ -135,6 +145,9 @@ private:
     Status _pre_filter_src_block();
     Status _convert_to_output_block(Block* block);
     Status _generate_fill_columns();
+    Status _handle_dynamic_block(Block* block);
+    Status _split_conjuncts(VExpr* conjunct_expr_root);
+    void _get_slot_ids(VExpr* expr, std::vector<int>* slot_ids);
 
     void _reset_counter() {
         _counter.num_rows_unselected = 0;
diff --git a/be/src/vec/exec/scan/vscan_node.cpp 
b/be/src/vec/exec/scan/vscan_node.cpp
index f019df4b60..f8fa213d33 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -447,6 +447,8 @@ Status VScanNode::_normalize_conjuncts() {
     std::vector<SlotDescriptor*> slots = _output_tuple_desc->slots();
 
     for (int slot_idx = 0; slot_idx < slots.size(); ++slot_idx) {
+        _colname_to_slot_id[slots[slot_idx]->col_name()] = 
slots[slot_idx]->id();
+
         auto type = slots[slot_idx]->type().type;
         if (slots[slot_idx]->type().type == TYPE_ARRAY) {
             type = slots[slot_idx]->type().children[0].type;
diff --git a/be/src/vec/exec/scan/vscan_node.h 
b/be/src/vec/exec/scan/vscan_node.h
index 51dc2edb1b..de641e6055 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -311,6 +311,8 @@ protected:
     RuntimeProfile::HighWaterMarkCounter* _queued_blocks_memory_usage;
     RuntimeProfile::HighWaterMarkCounter* _free_blocks_memory_usage;
 
+    std::unordered_map<std::string, int> _colname_to_slot_id;
+
 private:
     // Register and get all runtime filters at Init phase.
     Status _register_runtime_filter();
diff --git a/be/test/vec/exec/parquet/parquet_reader_test.cpp 
b/be/test/vec/exec/parquet/parquet_reader_test.cpp
index 42ff7cf538..4ca7e55ecd 100644
--- a/be/test/vec/exec/parquet/parquet_reader_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_reader_test.cpp
@@ -108,7 +108,8 @@ TEST_F(ParquetReaderTest, normal) {
         scan_range.start_offset = 0;
         scan_range.size = 1000;
     }
-    auto p_reader = new ParquetReader(nullptr, scan_params, scan_range, 992, 
&ctz, nullptr);
+    auto p_reader =
+            new ParquetReader(nullptr, scan_params, scan_range, 992, &ctz, 
nullptr, nullptr);
     p_reader->set_file_reader(reader);
     RuntimeState runtime_state((TQueryGlobals()));
     runtime_state.set_desc_tbl(desc_tbl);
@@ -116,7 +117,8 @@ TEST_F(ParquetReaderTest, normal) {
 
     std::unordered_map<std::string, ColumnValueRangeType> 
colname_to_value_range;
     p_reader->open();
-    p_reader->init_reader(column_names, missing_column_names, nullptr, 
nullptr);
+    p_reader->init_reader(column_names, missing_column_names, nullptr, 
nullptr, nullptr, nullptr,
+                          nullptr, nullptr, nullptr);
     std::unordered_map<std::string, std::tuple<std::string, const 
SlotDescriptor*>>
             partition_columns;
     std::unordered_map<std::string, VExprContext*> missing_columns;
diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp 
b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
index c7e4894d20..9426505489 100644
--- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
@@ -191,7 +191,7 @@ static Status get_column_values(io::FileReaderSPtr 
file_reader, tparquet::Column
         // required column
         std::vector<u_short> null_map = {(u_short)rows};
         run_length_map.set_run_length_null_map(null_map, rows, nullptr);
-        return chunk_reader.decode_values(data_column, data_type, 
run_length_map);
+        return chunk_reader.decode_values(data_column, data_type, 
run_length_map, false);
     } else {
         // column with null values
         level_t level_type = definitions[0];
@@ -204,8 +204,8 @@ static Status get_column_values(io::FileReaderSPtr 
file_reader, tparquet::Column
                 } else {
                     std::vector<u_short> null_map = {(u_short)num_values};
                     run_length_map.set_run_length_null_map(null_map, rows, 
nullptr);
-                    RETURN_IF_ERROR(
-                            chunk_reader.decode_values(data_column, data_type, 
run_length_map));
+                    RETURN_IF_ERROR(chunk_reader.decode_values(data_column, 
data_type,
+                                                               run_length_map, 
false));
                 }
                 level_type = definitions[i];
                 num_values = 1;
@@ -219,7 +219,8 @@ static Status get_column_values(io::FileReaderSPtr 
file_reader, tparquet::Column
         } else {
             std::vector<u_short> null_map = {(u_short)num_values};
             run_length_map.set_run_length_null_map(null_map, rows, nullptr);
-            RETURN_IF_ERROR(chunk_reader.decode_values(data_column, data_type, 
run_length_map));
+            RETURN_IF_ERROR(
+                    chunk_reader.decode_values(data_column, data_type, 
run_length_map, false));
         }
         return Status::OK();
     }
@@ -421,12 +422,13 @@ TEST_F(ParquetThriftReaderTest, group_reader) {
     std::shared_ptr<RowGroupReader> row_group_reader;
     RowGroupReader::PositionDeleteContext 
position_delete_ctx(row_group.num_rows, 0);
     row_group_reader.reset(new RowGroupReader(file_reader, read_columns, 0, 
row_group, &ctz,
-                                              position_delete_ctx, 
lazy_read_ctx));
+                                              position_delete_ctx, 
lazy_read_ctx, nullptr));
     std::vector<RowRange> row_ranges;
     row_ranges.emplace_back(0, row_group.num_rows);
 
     auto col_offsets = std::unordered_map<int, tparquet::OffsetIndex>();
-    auto stg = row_group_reader->init(meta_data->schema(), row_ranges, 
col_offsets);
+    auto stg = row_group_reader->init(meta_data->schema(), row_ranges, 
col_offsets, nullptr,
+                                      nullptr, nullptr, nullptr, nullptr);
     EXPECT_TRUE(stg.ok());
 
     vectorized::Block block;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to