This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b5bb565a7 [feature-wip](parquet-reader) parquet dictionary decoder 
(#11981)
0b5bb565a7 is described below

commit 0b5bb565a7497463e79e1efce7f6739e48b1deba
Author: Ashin Gau <ashin...@users.noreply.github.com>
AuthorDate: Fri Aug 26 19:24:37 2022 +0800

    [feature-wip](parquet-reader) parquet dictionary decoder (#11981)
    
    Parse parquet data with dictionary encoding.
    
    Using the PLAIN_DICTIONARY enum value is deprecated in the Parquet 2.0 
specification.
    Prefer using RLE_DICTIONARY in a data page and PLAIN in a dictionary page 
for Parquet 2.0+ files.
    refer: https://github.com/apache/parquet-format/blob/master/Encodings.md
---
 be/src/util/rle_encoding.h                         | 291 +++++++++++++++++++
 be/src/vec/exec/format/parquet/parquet_common.cpp  | 176 +++++++-----
 be/src/vec/exec/format/parquet/parquet_common.h    | 151 ++++++----
 .../parquet/vparquet_column_chunk_reader.cpp       |  56 +++-
 .../format/parquet/vparquet_column_chunk_reader.h  |   4 +-
 .../test_data/parquet_scanner/dict-decoder.parquet | Bin 0 -> 4282 bytes
 .../test_data/parquet_scanner/dict-decoder.txt     |  16 ++
 .../test_data/parquet_scanner/type-decoder.txt     |  14 +
 be/test/vec/exec/parquet/parquet_thrift_test.cpp   | 315 ++++++---------------
 9 files changed, 657 insertions(+), 366 deletions(-)

diff --git a/be/src/util/rle_encoding.h b/be/src/util/rle_encoding.h
index 08a7a23a4d..1409473a09 100644
--- a/be/src/util/rle_encoding.h
+++ b/be/src/util/rle_encoding.h
@@ -568,4 +568,295 @@ inline void RleEncoder<T>::Clear() {
     bit_writer_.Clear();
 }
 
+// Copy from 
https://github.com/apache/impala/blob/master/be/src/util/rle-encoding.h
+// Utility classes to do run length encoding (RLE) for fixed bit width values. 
 If runs
+// are sufficiently long, RLE is used, otherwise, the values are just 
bit-packed
+// (literal encoding).
+//
+// For both types of runs, there is a byte-aligned indicator which encodes the 
length
+// of the run and the type of the run.
+//
+// This encoding has the benefit that when there aren't any long enough runs, 
values
+// are always decoded at fixed (can be precomputed) bit offsets OR both the 
value and
+// the run length are byte aligned. This allows for very efficient decoding
+// implementations.
+// The encoding is:
+//    encoded-block := run*
+//    run := literal-run | repeated-run
+//    literal-run := literal-indicator < literal bytes >
+//    repeated-run := repeated-indicator < repeated value. padded to byte 
boundary >
+//    literal-indicator := varint_encode( number_of_groups << 1 | 1)
+//    repeated-indicator := varint_encode( number_of_repetitions << 1 )
+//
+// Each run is preceded by a varint. The varint's least significant bit is
+// used to indicate whether the run is a literal run or a repeated run. The 
rest
+// of the varint is used to determine the length of the run (eg how many times 
the
+// value repeats).
+//
+// In the case of literal runs, the run length is always a multiple of 8 (i.e. 
encode
+// in groups of 8), so that no matter the bit-width of the value, the sequence 
will end
+// on a byte boundary without padding.
+// Given that we know it is a multiple of 8, we store the number of 8-groups 
rather than
+// the actual number of encoded ints. (This means that the total number of 
encoded values
+// can not be determined from the encoded data, since the number of values in 
the last
+// group may not be a multiple of 8). For the last group of literal runs, we 
pad
+// the group to 8 with zeros. This allows for 8 at a time decoding on the read 
side
+// without the need for additional checks.
+//
+// There is a break-even point when it is more storage efficient to do run 
length
+// encoding.  For 1 bit-width values, that point is 8 values.  They require 2 
bytes
+// for both the repeated encoding or the literal encoding.  This value can 
always
+// be computed based on the bit-width.
+// TODO: For 1 bit-width values it can be optimal to use 16 or 24 values, but 
more
+// investigation is needed to do this efficiently, see the reverted 
IMPALA-6658.
+// TODO: think about how to use this for strings.  The bit packing isn't quite 
the same.
+//
+// Examples with bit-width 1 (eg encoding booleans):
+// ----------------------------------------
+// 100 1s followed by 100 0s:
+// <varint(100 << 1)> <1, padded to 1 byte> <varint(100 << 1)> <0, padded to 1 
byte>
+//  - (total 4 bytes)
+//
+// alternating 1s and 0s (200 total):
+// 200 ints = 25 groups of 8
+// <varint((25 << 1) | 1)> <25 bytes of values, bitpacked>
+// (total 26 bytes, 1 byte overhead)
+
+// RLE decoder with a batch-oriented interface that enables fast decoding.
+// Users of this class must first initialize the class to point to a buffer of
+// RLE-encoded data, passed into the constructor or Reset(). The provided
+// bit_width must be at most min(sizeof(T) * 8, 
BatchedBitReader::MAX_BITWIDTH).
+// Then they can decode data by checking NextNumRepeats()/NextNumLiterals() to
+// see if the next run is a repeated or literal run, then calling
+// GetRepeatedValue() or GetLiteralValues() respectively to read the values.
+//
+// End-of-input is signalled by NextNumRepeats() == NextNumLiterals() == 0.
+// Other decoding errors are signalled by functions returning false. If an
+// error is encountered then it is not valid to read any more data until
+// Reset() is called.
+
+template <typename T>
+class RleBatchDecoder {
+public:
+    RleBatchDecoder(uint8_t* buffer, int buffer_len, int bit_width) {
+        Reset(buffer, buffer_len, bit_width);
+    }
+
+    RleBatchDecoder() = default;
+
+    // Reset the decoder to read from a new buffer.
+    void Reset(uint8_t* buffer, int buffer_len, int bit_width);
+
+    // Return the size of the current repeated run. Returns zero if the 
current run is
+    // a literal run or if no more runs can be read from the input.
+    int32_t NextNumRepeats();
+
+    // Get the value of the current repeated run and consume the given number 
of repeats.
+    // Only valid to call when NextNumRepeats() > 0. The given number of 
repeats cannot
+    // be greater than the remaining number of repeats in the run. 
'num_repeats_to_consume'
+    // can be set to 0 to peek at the value without consuming repeats.
+    T GetRepeatedValue(int32_t num_repeats_to_consume);
+
+    // Return the size of the current literal run. Returns zero if the current 
run is
+    // a repeated run or if no more runs can be read from the input.
+    int32_t NextNumLiterals();
+
+    // Consume 'num_literals_to_consume' literals from the current literal run,
+    // copying the values to 'values'. 'num_literals_to_consume' must be <=
+    // NextNumLiterals(). Returns true if the requested number of literals were
+    // successfully read or false if an error was encountered, e.g. the input 
was
+    // truncated.
+    bool GetLiteralValues(int32_t num_literals_to_consume, T* values) 
WARN_UNUSED_RESULT;
+
+    // Consume 'num_values_to_consume' values and copy them to 'values'.
+    // Returns the number of consumed values or 0 if an error occurred.
+    int32_t GetBatch(T* values, int32_t batch_num);
+
+private:
+    // Called when both 'literal_count_' and 'repeat_count_' have been 
exhausted.
+    // Sets either 'literal_count_' or 'repeat_count_' to the size of the next 
literal
+    // or repeated run, or leaves both at 0 if no more values can be read 
(either because
+    // the end of the input was reached or an error was encountered decoding).
+    void NextCounts();
+
+    /// Fill the literal buffer. Invalid to call if there are already buffered 
literals.
+    /// Return false if the input was truncated. This does not advance 
'literal_count_'.
+    bool FillLiteralBuffer() WARN_UNUSED_RESULT;
+
+    bool HaveBufferedLiterals() const { return literal_buffer_pos_ < 
num_buffered_literals_; }
+
+    /// Output buffered literals, advancing 'literal_buffer_pos_' and 
decrementing
+    /// 'literal_count_'. Returns the number of literals outputted.
+    int32_t OutputBufferedLiterals(int32_t max_to_output, T* values);
+
+    BatchedBitReader bit_reader_;
+
+    // Number of bits needed to encode the value. Must be between 0 and 64 
after
+    // the decoder is initialized with a buffer. -1 indicates the decoder was 
not
+    // initialized.
+    int bit_width_ = -1;
+
+    // If a repeated run, the number of repeats remaining in the current run 
to be read.
+    // If the current run is a literal run, this is 0.
+    int32_t repeat_count_ = 0;
+
+    // If a literal run, the number of literals remaining in the current run 
to be read.
+    // If the current run is a repeated run, this is 0.
+    int32_t literal_count_ = 0;
+
+    // If a repeated run, the current repeated value.
+    T repeated_value_;
+
+    // Size of buffer for literal values. Large enough to decode a full batch 
of 32
+    // literals. The buffer is needed to allow clients to read in batches that 
are not
+    // multiples of 32.
+    static constexpr int LITERAL_BUFFER_LEN = 32;
+
+    // Buffer containing 'num_buffered_literals_' values. 
'literal_buffer_pos_' is the
+    // position of the next literal to be read from the buffer.
+    T literal_buffer_[LITERAL_BUFFER_LEN];
+    int num_buffered_literals_ = 0;
+    int literal_buffer_pos_ = 0;
+};
+
+template <typename T>
+inline int32_t RleBatchDecoder<T>::OutputBufferedLiterals(int32_t 
max_to_output, T* values) {
+    int32_t num_to_output =
+            std::min<int32_t>(max_to_output, num_buffered_literals_ - 
literal_buffer_pos_);
+    memcpy(values, &literal_buffer_[literal_buffer_pos_], sizeof(T) * 
num_to_output);
+    literal_buffer_pos_ += num_to_output;
+    literal_count_ -= num_to_output;
+    return num_to_output;
+}
+
+template <typename T>
+inline void RleBatchDecoder<T>::Reset(uint8_t* buffer, int buffer_len, int 
bit_width) {
+    bit_reader_.Reset(buffer, buffer_len);
+    bit_width_ = bit_width;
+    repeat_count_ = 0;
+    literal_count_ = 0;
+    num_buffered_literals_ = 0;
+    literal_buffer_pos_ = 0;
+}
+
+template <typename T>
+inline int32_t RleBatchDecoder<T>::NextNumRepeats() {
+    if (repeat_count_ > 0) return repeat_count_;
+    if (literal_count_ == 0) NextCounts();
+    return repeat_count_;
+}
+
+template <typename T>
+inline void RleBatchDecoder<T>::NextCounts() {
+    // Read the next run's indicator int, it could be a literal or repeated 
run.
+    // The int is encoded as a ULEB128-encoded value.
+    uint32_t indicator_value = 0;
+    if (UNLIKELY(!bit_reader_.GetUleb128<uint32_t>(&indicator_value))) {
+        return;
+    }
+
+    // lsb indicates if it is a literal run or repeated run
+    bool is_literal = indicator_value & 1;
+
+    // Don't try to handle run lengths that don't fit in an int32_t - just 
fail gracefully.
+    // The Parquet standard does not allow longer runs - see PARQUET-1290.
+    uint32_t run_len = indicator_value >> 1;
+    if (is_literal) {
+        // Use int64_t to avoid overflowing multiplication.
+        int64_t literal_count = static_cast<int64_t>(run_len) * 8;
+        if (UNLIKELY(literal_count > std::numeric_limits<int32_t>::max())) 
return;
+        literal_count_ = literal_count;
+    } else {
+        if (UNLIKELY(run_len == 0)) return;
+        bool result = bit_reader_.GetBytes<T>(BitUtil::Ceil(bit_width_, 8), 
&repeated_value_);
+        if (UNLIKELY(!result)) return;
+        repeat_count_ = run_len;
+    }
+}
+
+template <typename T>
+inline T RleBatchDecoder<T>::GetRepeatedValue(int32_t num_repeats_to_consume) {
+    repeat_count_ -= num_repeats_to_consume;
+    return repeated_value_;
+}
+
+template <typename T>
+inline int32_t RleBatchDecoder<T>::NextNumLiterals() {
+    if (literal_count_ > 0) return literal_count_;
+    if (repeat_count_ == 0) NextCounts();
+    return literal_count_;
+}
+
+template <typename T>
+inline bool RleBatchDecoder<T>::GetLiteralValues(int32_t 
num_literals_to_consume, T* values) {
+    int32_t num_consumed = 0;
+    // Copy any buffered literals left over from previous calls.
+    if (HaveBufferedLiterals()) {
+        num_consumed = OutputBufferedLiterals(num_literals_to_consume, values);
+    }
+
+    int32_t num_remaining = num_literals_to_consume - num_consumed;
+    // Copy literals directly to the output, bypassing 'literal_buffer_' when 
possible.
+    // Need to round to a batch of 32 if the caller is consuming only part of 
the current
+    // run avoid ending on a non-byte boundary.
+    int32_t num_to_bypass =
+            std::min<int32_t>(literal_count_, 
BitUtil::RoundDownToPowerOf2(num_remaining, 32));
+    if (num_to_bypass > 0) {
+        int num_read = bit_reader_.UnpackBatch(bit_width_, num_to_bypass, 
values + num_consumed);
+        // If we couldn't read the expected number, that means the input was 
truncated.
+        if (num_read < num_to_bypass) return false;
+        literal_count_ -= num_to_bypass;
+        num_consumed += num_to_bypass;
+        num_remaining = num_literals_to_consume - num_consumed;
+    }
+
+    if (num_remaining > 0) {
+        // We weren't able to copy all the literals requested directly from 
the input.
+        // Buffer literals and copy over the requested number.
+        if (UNLIKELY(!FillLiteralBuffer())) return false;
+        OutputBufferedLiterals(num_remaining, values + num_consumed);
+    }
+    return true;
+}
+
+template <typename T>
+inline bool RleBatchDecoder<T>::FillLiteralBuffer() {
+    int32_t num_to_buffer = std::min<int32_t>(LITERAL_BUFFER_LEN, 
literal_count_);
+    num_buffered_literals_ = bit_reader_.UnpackBatch(bit_width_, 
num_to_buffer, literal_buffer_);
+    // If we couldn't read the expected number, that means the input was 
truncated.
+    if (UNLIKELY(num_buffered_literals_ < num_to_buffer)) return false;
+    literal_buffer_pos_ = 0;
+    return true;
+}
+
+template <typename T>
+inline int32_t RleBatchDecoder<T>::GetBatch(T* values, int32_t batch_num) {
+    int32_t num_consumed = 0;
+    while (num_consumed < batch_num) {
+        // Add RLE encoded values by repeating the current value this number 
of times.
+        int32_t num_repeats = NextNumRepeats();
+        if (num_repeats > 0) {
+            int32_t num_repeats_to_set = std::min(num_repeats, batch_num - 
num_consumed);
+            T repeated_value = GetRepeatedValue(num_repeats_to_set);
+            for (int i = 0; i < num_repeats_to_set; ++i) {
+                values[num_consumed + i] = repeated_value;
+            }
+            num_consumed += num_repeats_to_set;
+            continue;
+        }
+
+        // Add remaining literal values, if any.
+        int32_t num_literals = NextNumLiterals();
+        if (num_literals == 0) {
+            break;
+        }
+        int32_t num_literals_to_set = std::min(num_literals, batch_num - 
num_consumed);
+        if (!GetLiteralValues(num_literals_to_set, values + num_consumed)) {
+            return 0;
+        }
+        num_consumed += num_literals_to_set;
+    }
+    return num_consumed;
+}
+
 } // namespace doris
diff --git a/be/src/vec/exec/format/parquet/parquet_common.cpp 
b/be/src/vec/exec/format/parquet/parquet_common.cpp
index be4ec35223..347db41d86 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.cpp
+++ b/be/src/vec/exec/format/parquet/parquet_common.cpp
@@ -44,12 +44,16 @@ Status Decoder::get_decoder(tparquet::Type::type type, 
tparquet::Encoding::type
                             std::unique_ptr<Decoder>& decoder) {
     switch (encoding) {
     case tparquet::Encoding::PLAIN:
+    case tparquet::Encoding::RLE_DICTIONARY:
         switch (type) {
         case tparquet::Type::BOOLEAN:
+            if (encoding != tparquet::Encoding::PLAIN) {
+                return Status::InternalError("Bool type can't has dictionary 
page");
+            }
             decoder.reset(new BoolPlainDecoder());
             break;
         case tparquet::Type::BYTE_ARRAY:
-            decoder.reset(new ByteArrayPlainDecoder());
+            decoder.reset(new ByteArrayDecoder());
             break;
         case tparquet::Type::INT32:
         case tparquet::Type::INT64:
@@ -57,14 +61,12 @@ Status Decoder::get_decoder(tparquet::Type::type type, 
tparquet::Encoding::type
         case tparquet::Type::FLOAT:
         case tparquet::Type::DOUBLE:
         case tparquet::Type::FIXED_LEN_BYTE_ARRAY:
-            decoder.reset(new PlainDecoder(type));
+            decoder.reset(new FixLengthDecoder(type));
             break;
         default:
-            return Status::InternalError("Unsupported plain type {} in parquet 
decoder",
+            return Status::InternalError("Unsupported type {} in parquet 
decoder",
                                          tparquet::to_string(type));
         }
-    case tparquet::Encoding::RLE_DICTIONARY:
-        break;
     default:
         return Status::InternalError("Unsupported encoding {} in parquet 
decoder",
                                      tparquet::to_string(encoding));
@@ -118,39 +120,55 @@ Status Decoder::decode_values(ColumnPtr& doris_column, 
DataTypePtr& data_type, s
     return decode_values(data_column, data_type, num_values);
 }
 
-Status PlainDecoder::decode_values(Slice& slice, size_t num_values) {
-    size_t to_read_bytes = _type_length * num_values;
-    if (UNLIKELY(to_read_bytes > slice.size)) {
-        return Status::IOError("Slice does not have enough space to write out 
the decoding data");
-    }
-    memcpy(slice.data, _data->data + _offset, to_read_bytes);
-    _offset += to_read_bytes;
+Status FixLengthDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, size_t 
dict_size) {
+    _has_dict = true;
+    _dict = std::move(dict);
     return Status::OK();
 }
 
-Status PlainDecoder::skip_values(size_t num_values) {
-    _offset += _type_length * num_values;
-    if (UNLIKELY(_offset > _data->size)) {
-        return Status::IOError("Out-of-bounds access in parquet data decoder");
+void FixLengthDecoder::set_data(Slice* data) {
+    _data = data;
+    _offset = 0;
+    if (_has_dict) {
+        uint8_t bit_width = *data->data;
+        _index_batch_decoder.reset(
+                new 
RleBatchDecoder<uint32_t>(reinterpret_cast<uint8_t*>(data->data) + 1,
+                                              static_cast<int>(data->size) - 
1, bit_width));
+    }
+}
+
+Status FixLengthDecoder::skip_values(size_t num_values) {
+    if (_has_dict) {
+        _indexes.resize(num_values);
+        _index_batch_decoder->GetBatch(&_indexes[0], num_values);
+    } else {
+        _offset += _type_length * num_values;
+        if (UNLIKELY(_offset > _data->size)) {
+            return Status::IOError("Out-of-bounds access in parquet data 
decoder");
+        }
     }
     return Status::OK();
 }
 
-Status PlainDecoder::_decode_short_int(MutableColumnPtr& doris_column, size_t 
num_values,
-                                       size_t real_length) {
+Status FixLengthDecoder::_decode_short_int(MutableColumnPtr& doris_column, 
size_t num_values,
+                                           size_t real_length) {
     if (UNLIKELY(_physical_type != tparquet::Type::INT32)) {
         return Status::InternalError("Short int can only be decoded from 
INT32");
     }
     for (int i = 0; i < num_values; ++i) {
-        doris_column->insert_data(_data->data + _offset, real_length);
-        _offset += _type_length;
+        char* buf_start = _FIXED_GET_DATA_OFFSET(i);
+        doris_column->insert_data(buf_start, real_length);
+        _FIXED_SHIFT_DATA_OFFSET();
     }
     return Status::OK();
 }
 
-Status PlainDecoder::decode_values(MutableColumnPtr& doris_column, 
DataTypePtr& data_type,
-                                   size_t num_values) {
-    if (UNLIKELY(_offset + _type_length * num_values > _data->size)) {
+Status FixLengthDecoder::decode_values(MutableColumnPtr& doris_column, 
DataTypePtr& data_type,
+                                       size_t num_values) {
+    if (_has_dict) {
+        _indexes.resize(num_values);
+        _index_batch_decoder->GetBatch(&_indexes[0], num_values);
+    } else if (UNLIKELY(_offset + _type_length * num_values > _data->size)) {
         return Status::IOError("Out-of-bounds access in parquet data decoder");
     }
     TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
@@ -228,8 +246,9 @@ Status PlainDecoder::decode_values(MutableColumnPtr& 
doris_column, DataTypePtr&
     case TypeIndex::FixedString:
         if (_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
             for (int i = 0; i < num_values; ++i) {
-                doris_column->insert_data(_data->data + _offset, _type_length);
-                _offset += _type_length;
+                char* buf_start = _FIXED_GET_DATA_OFFSET(i);
+                doris_column->insert_data(buf_start, _type_length);
+                _FIXED_SHIFT_DATA_OFFSET();
             }
             return Status::OK();
         }
@@ -243,48 +262,37 @@ Status PlainDecoder::decode_values(MutableColumnPtr& 
doris_column, DataTypePtr&
                                    getTypeName(data_type->get_type_id()));
 }
 
-Status ByteArrayPlainDecoder::decode_values(Slice& slice, size_t num_values) {
-    uint32_t slice_offset = 0;
-    for (int i = 0; i < num_values; ++i) {
-        if (UNLIKELY(_offset + 4 > _data->size)) {
-            return Status::IOError("Can't read byte array length from plain 
decoder");
-        }
-        uint32_t length =
-                decode_fixed32_le(reinterpret_cast<const 
uint8_t*>(_data->data) + _offset);
-        _offset += 4;
-        if (UNLIKELY(_offset + length) > _data->size) {
-            return Status::IOError("Can't read enough bytes in plain decoder");
-        }
-        memcpy(slice.data + slice_offset, _data->data + _offset, length);
-        slice_offset += length + 1;
-        slice.data[slice_offset - 1] = '\0';
-        _offset += length;
+Status ByteArrayDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, size_t 
dict_size) {
+    _has_dict = true;
+    _dict = std::move(dict);
+    _dict_offsets.resize(dict_size + 1);
+    uint32_t offset_cursor = 0;
+    for (int i = 0; i < dict_size; ++i) {
+        uint32_t length = decode_fixed32_le(_dict.get() + offset_cursor);
+        offset_cursor += 4;
+        _dict_offsets[i] = offset_cursor;
+        offset_cursor += length;
     }
+    _dict_offsets[dict_size] = offset_cursor + 4;
     return Status::OK();
 }
 
-Status ByteArrayPlainDecoder::skip_values(size_t num_values) {
-    for (int i = 0; i < num_values; ++i) {
-        if (UNLIKELY(_offset + 4 > _data->size)) {
-            return Status::IOError("Can't read byte array length from plain 
decoder");
-        }
-        uint32_t length =
-                decode_fixed32_le(reinterpret_cast<const 
uint8_t*>(_data->data) + _offset);
-        _offset += 4;
-        if (UNLIKELY(_offset + length) > _data->size) {
-            return Status::IOError("Can't skip enough bytes in plain decoder");
-        }
-        _offset += length;
+void ByteArrayDecoder::set_data(Slice* data) {
+    _data = data;
+    _offset = 0;
+    if (_has_dict) {
+        uint8_t bit_width = *data->data;
+        _index_batch_decoder.reset(
+                new 
RleBatchDecoder<uint32_t>(reinterpret_cast<uint8_t*>(data->data) + 1,
+                                              static_cast<int>(data->size) - 
1, bit_width));
     }
-    return Status::OK();
 }
 
-Status ByteArrayPlainDecoder::decode_values(MutableColumnPtr& doris_column, 
DataTypePtr& data_type,
-                                            size_t num_values) {
-    TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
-    switch (logical_type) {
-    case TypeIndex::String:
-    case TypeIndex::FixedString:
+Status ByteArrayDecoder::skip_values(size_t num_values) {
+    if (_has_dict) {
+        _indexes.resize(num_values);
+        _index_batch_decoder->GetBatch(&_indexes[0], num_values);
+    } else {
         for (int i = 0; i < num_values; ++i) {
             if (UNLIKELY(_offset + 4 > _data->size)) {
                 return Status::IOError("Can't read byte array length from 
plain decoder");
@@ -293,11 +301,44 @@ Status 
ByteArrayPlainDecoder::decode_values(MutableColumnPtr& doris_column, Data
                     decode_fixed32_le(reinterpret_cast<const 
uint8_t*>(_data->data) + _offset);
             _offset += 4;
             if (UNLIKELY(_offset + length) > _data->size) {
-                return Status::IOError("Can't read enough bytes in plain 
decoder");
+                return Status::IOError("Can't skip enough bytes in plain 
decoder");
             }
-            doris_column->insert_data(_data->data + _offset, length);
             _offset += length;
         }
+    }
+    return Status::OK();
+}
+
+Status ByteArrayDecoder::decode_values(MutableColumnPtr& doris_column, 
DataTypePtr& data_type,
+                                       size_t num_values) {
+    if (_has_dict) {
+        _indexes.resize(num_values);
+        _index_batch_decoder->GetBatch(&_indexes[0], num_values);
+    }
+    TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
+    switch (logical_type) {
+    case TypeIndex::String:
+    case TypeIndex::FixedString:
+        for (int i = 0; i < num_values; ++i) {
+            if (_has_dict) {
+                uint32_t idx = _indexes[i];
+                uint32_t idx_cursor = _dict_offsets[idx];
+                char* buff_start = reinterpret_cast<char*>(_dict.get() + 
idx_cursor);
+                doris_column->insert_data(buff_start, _dict_offsets[idx + 1] - 
idx_cursor - 4);
+            } else {
+                if (UNLIKELY(_offset + 4 > _data->size)) {
+                    return Status::IOError("Can't read byte array length from 
plain decoder");
+                }
+                uint32_t length =
+                        decode_fixed32_le(reinterpret_cast<const 
uint8_t*>(_data->data) + _offset);
+                _offset += 4;
+                if (UNLIKELY(_offset + length) > _data->size) {
+                    return Status::IOError("Can't read enough bytes in plain 
decoder");
+                }
+                doris_column->insert_data(_data->data + _offset, length);
+                _offset += length;
+            }
+        }
         return Status::OK();
     case TypeIndex::Decimal32:
         return _decode_binary_decimal<Int32>(doris_column, data_type, 
num_values);
@@ -313,17 +354,6 @@ Status 
ByteArrayPlainDecoder::decode_values(MutableColumnPtr& doris_column, Data
             getTypeName(data_type->get_type_id()));
 }
 
-Status BoolPlainDecoder::decode_values(Slice& slice, size_t num_values) {
-    bool value;
-    for (int i = 0; i < num_values; ++i) {
-        if (UNLIKELY(!_decode_value(&value))) {
-            return Status::IOError("Can't read enough booleans in plain 
decoder");
-        }
-        slice.data[i] = value ? 1 : 0;
-    }
-    return Status::OK();
-}
-
 Status BoolPlainDecoder::skip_values(size_t num_values) {
     int skip_cached = std::min(num_unpacked_values_ - unpacked_value_idx_, 
(int)num_values);
     unpacked_value_idx_ += skip_cached;
diff --git a/be/src/vec/exec/format/parquet/parquet_common.h 
b/be/src/vec/exec/format/parquet/parquet_common.h
index 3620721ddd..9504fd32b5 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.h
+++ b/be/src/vec/exec/format/parquet/parquet_common.h
@@ -25,6 +25,7 @@
 #include "schema_desc.h"
 #include "util/bit_stream_utils.inline.h"
 #include "util/coding.h"
+#include "util/rle_encoding.h"
 #include "vec/columns/column_array.h"
 #include "vec/columns/column_nullable.h"
 #include "vec/columns/column_string.h"
@@ -110,10 +111,12 @@ public:
     virtual Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& 
data_type,
                                  size_t num_values) = 0;
 
-    virtual Status decode_values(Slice& slice, size_t num_values) = 0;
-
     virtual Status skip_values(size_t num_values) = 0;
 
+    virtual Status set_dict(std::unique_ptr<uint8_t[]>& dict, size_t 
dict_size) {
+        return Status::NotSupported("set_dict is not supported");
+    }
+
 protected:
     int32_t _type_length;
     Slice* _data = nullptr;
@@ -146,33 +149,25 @@ void Decoder::init_decimal_converter(DataTypePtr& 
data_type) {
     }
 }
 
-class PlainDecoder final : public Decoder {
+class FixLengthDecoder final : public Decoder {
 public:
-    PlainDecoder(tparquet::Type::type physical_type) : 
_physical_type(physical_type) {};
-    ~PlainDecoder() override = default;
+    FixLengthDecoder(tparquet::Type::type physical_type) : 
_physical_type(physical_type) {};
+    ~FixLengthDecoder() override = default;
 
     Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& 
data_type,
                          size_t num_values) override;
 
-    Status decode_values(Slice& slice, size_t num_values) override;
-
     Status skip_values(size_t num_values) override;
 
+    Status set_dict(std::unique_ptr<uint8_t[]>& dict, size_t dict_size) 
override;
+
+    void set_data(Slice* data) override;
+
 protected:
     Status _decode_short_int(MutableColumnPtr& doris_column, size_t 
num_values, size_t real_length);
 
     template <typename Numeric>
-    Status _decode_numeric(MutableColumnPtr& doris_column, size_t num_values) {
-        size_t to_read_bytes = _type_length * num_values;
-        if (UNLIKELY(_offset + to_read_bytes > _data->size)) {
-            return Status::IOError("Out-of-bounds access in parquet data 
decoder");
-        }
-        auto& column_data = 
static_cast<ColumnVector<Numeric>&>(*doris_column).get_data();
-        const auto* raw_data = reinterpret_cast<const Numeric*>(_data->data + 
_offset);
-        column_data.insert(raw_data, raw_data + num_values);
-        _offset += to_read_bytes;
-        return Status::OK();
-    }
+    Status _decode_numeric(MutableColumnPtr& doris_column, size_t num_values);
 
     template <typename CppType, typename ColumnType>
     Status _decode_date(MutableColumnPtr& doris_column, TypeIndex& 
logical_type, size_t num_values);
@@ -193,16 +188,44 @@ protected:
     Status _decode_primitive_decimal(MutableColumnPtr& doris_column, 
DataTypePtr& data_type,
                                      size_t num_values);
 
+#define _FIXED_GET_DATA_OFFSET(index)                                          
       \
+    _has_dict ? reinterpret_cast<char*>(_dict.get() + _indexes[index] * 
_type_length) \
+              : _data->data + _offset
+
+#define _FIXED_SHIFT_DATA_OFFSET() \
+    if (!_has_dict) _offset += _type_length
+
     tparquet::Type::type _physical_type;
+    // For dictionary encoding
+    bool _has_dict = false;
+    std::unique_ptr<uint8_t[]> _dict = nullptr;
+    std::unique_ptr<RleBatchDecoder<uint32_t>> _index_batch_decoder = nullptr;
+    std::vector<uint32_t> _indexes;
 };
 
+template <typename Numeric>
+Status FixLengthDecoder::_decode_numeric(MutableColumnPtr& doris_column, 
size_t num_values) {
+    if (_has_dict) {
+        for (int i = 0; i < num_values; ++i) {
+            char* buf_start = _FIXED_GET_DATA_OFFSET(i);
+            doris_column->insert_data(buf_start, _type_length);
+        }
+    } else {
+        auto& column_data = 
static_cast<ColumnVector<Numeric>&>(*doris_column).get_data();
+        const auto* raw_data = reinterpret_cast<const Numeric*>(_data->data + 
_offset);
+        column_data.insert(raw_data, raw_data + num_values);
+        _offset += _type_length * num_values;
+    }
+    return Status::OK();
+}
+
 template <typename CppType, typename ColumnType>
-Status PlainDecoder::_decode_date(MutableColumnPtr& doris_column, TypeIndex& 
logical_type,
-                                  size_t num_values) {
+Status FixLengthDecoder::_decode_date(MutableColumnPtr& doris_column, 
TypeIndex& logical_type,
+                                      size_t num_values) {
     auto& column_data = 
static_cast<ColumnVector<ColumnType>&>(*doris_column).get_data();
     for (int i = 0; i < num_values; ++i) {
-        int64_t date_value =
-                static_cast<int64_t>(*reinterpret_cast<int32_t*>(_data->data + 
_offset));
+        char* buf_start = _FIXED_GET_DATA_OFFSET(i);
+        int64_t date_value = 
static_cast<int64_t>(*reinterpret_cast<int32_t*>(buf_start));
         CppType v;
         v.from_unixtime(date_value * 24 * 60 * 60, *_decode_params->ctz); // 
day to seconds
         if constexpr (std::is_same_v<CppType, VecDateTimeValue>) {
@@ -211,17 +234,18 @@ Status PlainDecoder::_decode_date(MutableColumnPtr& 
doris_column, TypeIndex& log
         }
         ColumnType& cast_value = *reinterpret_cast<ColumnType*>(&v);
         column_data.emplace_back(cast_value);
-        _offset += _type_length;
+        _FIXED_SHIFT_DATA_OFFSET();
     }
     return Status::OK();
 }
 
 template <typename CppType, typename ColumnType>
-Status PlainDecoder::_decode_datetime64(MutableColumnPtr& doris_column, 
TypeIndex& logical_type,
-                                        size_t num_values) {
+Status FixLengthDecoder::_decode_datetime64(MutableColumnPtr& doris_column, 
TypeIndex& logical_type,
+                                            size_t num_values) {
     auto& column_data = 
static_cast<ColumnVector<ColumnType>&>(*doris_column).get_data();
     for (int i = 0; i < num_values; i++) {
-        int64_t& date_value = *reinterpret_cast<int64_t*>(_data->data + 
_offset);
+        char* buf_start = _FIXED_GET_DATA_OFFSET(i);
+        int64_t& date_value = *reinterpret_cast<int64_t*>(buf_start);
         CppType v;
         v.from_unixtime(date_value / _decode_params->second_mask, 
*_decode_params->ctz);
         if constexpr (std::is_same_v<CppType, 
DateV2Value<DateTimeV2ValueType>>) {
@@ -231,17 +255,18 @@ Status PlainDecoder::_decode_datetime64(MutableColumnPtr& 
doris_column, TypeInde
         }
         ColumnType& cast_value = *reinterpret_cast<ColumnType*>(&v);
         column_data.emplace_back(cast_value);
-        _offset += _type_length;
+        _FIXED_SHIFT_DATA_OFFSET();
     }
     return Status::OK();
 }
 
 template <typename CppType, typename ColumnType>
-Status PlainDecoder::_decode_datetime96(MutableColumnPtr& doris_column, 
TypeIndex& logical_type,
-                                        size_t num_values) {
+Status FixLengthDecoder::_decode_datetime96(MutableColumnPtr& doris_column, 
TypeIndex& logical_type,
+                                            size_t num_values) {
     auto& column_data = 
static_cast<ColumnVector<ColumnType>&>(*doris_column).get_data();
     for (int i = 0; i < num_values; ++i) {
-        ParquetInt96& datetime96 = 
*reinterpret_cast<ParquetInt96*>(_data->data + _offset);
+        char* buf_start = _FIXED_GET_DATA_OFFSET(i);
+        ParquetInt96& datetime96 = *reinterpret_cast<ParquetInt96*>(buf_start);
         CppType v;
         int64_t micros = datetime96.to_timestamp_micros();
         v.from_unixtime(micros / 1000000, *_decode_params->ctz);
@@ -252,20 +277,20 @@ Status PlainDecoder::_decode_datetime96(MutableColumnPtr& 
doris_column, TypeInde
         }
         ColumnType& cast_value = *reinterpret_cast<ColumnType*>(&v);
         column_data.emplace_back(cast_value);
-        _offset += _type_length;
+        _FIXED_SHIFT_DATA_OFFSET();
     }
     return Status::OK();
 }
 
 template <typename DecimalPrimitiveType>
-Status PlainDecoder::_decode_binary_decimal(MutableColumnPtr& doris_column, 
DataTypePtr& data_type,
-                                            size_t num_values) {
+Status FixLengthDecoder::_decode_binary_decimal(MutableColumnPtr& doris_column,
+                                                DataTypePtr& data_type, size_t 
num_values) {
     init_decimal_converter<DecimalPrimitiveType>(data_type);
     auto& column_data =
             
static_cast<ColumnDecimal<Decimal<DecimalPrimitiveType>>&>(*doris_column).get_data();
     DecimalScaleParams& scale_params = _decode_params->decimal_scale;
     for (int i = 0; i < num_values; ++i) {
-        char* buf_start = _data->data + _offset;
+        char* buf_start = _FIXED_GET_DATA_OFFSET(i);
         // When Decimal in parquet is stored in byte arrays, binary and fixed,
         // the unscaled number must be encoded as two's complement using 
big-endian byte order.
         Int128 value = buf_start[0] & 0x80 ? -1 : 0;
@@ -279,21 +304,22 @@ Status 
PlainDecoder::_decode_binary_decimal(MutableColumnPtr& doris_column, Data
         }
         DecimalPrimitiveType cast_value(value);
         
column_data.emplace_back(*reinterpret_cast<Decimal<DecimalPrimitiveType>*>(&cast_value));
-        _offset += _type_length;
+        _FIXED_SHIFT_DATA_OFFSET();
     }
     return Status::OK();
 }
 
 template <typename DecimalPrimitiveType, typename DecimalPhysicalType>
-Status PlainDecoder::_decode_primitive_decimal(MutableColumnPtr& doris_column,
-                                               DataTypePtr& data_type, size_t 
num_values) {
+Status FixLengthDecoder::_decode_primitive_decimal(MutableColumnPtr& 
doris_column,
+                                                   DataTypePtr& data_type, 
size_t num_values) {
     init_decimal_converter<DecimalPrimitiveType>(data_type);
     auto& column_data =
             
static_cast<ColumnDecimal<Decimal<DecimalPrimitiveType>>&>(*doris_column).get_data();
     DecimalScaleParams& scale_params = _decode_params->decimal_scale;
     for (int i = 0; i < num_values; ++i) {
+        char* buf_start = _FIXED_GET_DATA_OFFSET(i);
         // we should use decimal128 to scale up/down
-        Int128 value = *reinterpret_cast<DecimalPhysicalType*>(_data->data + 
_offset);
+        Int128 value = *reinterpret_cast<DecimalPhysicalType*>(buf_start);
         if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) {
             value *= scale_params.scale_factor;
         } else if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) {
@@ -301,44 +327,62 @@ Status 
PlainDecoder::_decode_primitive_decimal(MutableColumnPtr& doris_column,
         }
         DecimalPrimitiveType cast_value(value);
         
column_data.emplace_back(*reinterpret_cast<Decimal<DecimalPrimitiveType>*>(&cast_value));
-        _offset += _type_length;
+        _FIXED_SHIFT_DATA_OFFSET();
     }
     return Status::OK();
 }
 
-class ByteArrayPlainDecoder final : public Decoder {
+class ByteArrayDecoder final : public Decoder {
 public:
-    ByteArrayPlainDecoder() = default;
-    ~ByteArrayPlainDecoder() override = default;
+    ByteArrayDecoder() = default;
+    ~ByteArrayDecoder() override = default;
 
     Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& 
data_type,
                          size_t num_values) override;
 
-    Status decode_values(Slice& slice, size_t num_values) override;
-
     Status skip_values(size_t num_values) override;
 
+    void set_data(Slice* data) override;
+
+    Status set_dict(std::unique_ptr<uint8_t[]>& dict, size_t dict_size) 
override;
+
 protected:
     template <typename DecimalPrimitiveType>
     Status _decode_binary_decimal(MutableColumnPtr& doris_column, DataTypePtr& 
data_type,
                                   size_t num_values);
+
+    // For dictionary encoding
+    bool _has_dict = false;
+    std::unique_ptr<uint8_t[]> _dict = nullptr;
+    std::vector<uint32_t> _dict_offsets;
+    std::unique_ptr<RleBatchDecoder<uint32_t>> _index_batch_decoder = nullptr;
+    std::vector<uint32_t> _indexes;
 };
 
 template <typename DecimalPrimitiveType>
-Status ByteArrayPlainDecoder::_decode_binary_decimal(MutableColumnPtr& 
doris_column,
-                                                     DataTypePtr& data_type, 
size_t num_values) {
+Status ByteArrayDecoder::_decode_binary_decimal(MutableColumnPtr& doris_column,
+                                                DataTypePtr& data_type, size_t 
num_values) {
     init_decimal_converter<DecimalPrimitiveType>(data_type);
     auto& column_data =
             
static_cast<ColumnDecimal<Decimal<DecimalPrimitiveType>>&>(*doris_column).get_data();
     DecimalScaleParams& scale_params = _decode_params->decimal_scale;
     for (int i = 0; i < num_values; ++i) {
-        if (UNLIKELY(_offset + 4 > _data->size)) {
-            return Status::IOError("Can't read byte array length from plain 
decoder");
+        char* buf_start;
+        uint32_t length;
+        if (_has_dict) {
+            uint32_t idx = _indexes[i];
+            uint32_t idx_cursor = _dict_offsets[idx];
+            buf_start = reinterpret_cast<char*>(_dict.get() + idx_cursor);
+            length = _dict_offsets[idx + 1] - idx_cursor - 4;
+        } else {
+            if (UNLIKELY(_offset + 4 > _data->size)) {
+                return Status::IOError("Can't read byte array length from 
plain decoder");
+            }
+            length = decode_fixed32_le(reinterpret_cast<const 
uint8_t*>(_data->data) + _offset);
+            _offset += 4;
+            buf_start = _data->data + _offset;
+            _offset += length;
         }
-        uint32_t length =
-                decode_fixed32_le(reinterpret_cast<const 
uint8_t*>(_data->data) + _offset);
-        _offset += 4;
-        char* buf_start = _data->data + _offset;
         // When Decimal in parquet is stored in byte arrays, binary and fixed,
         // the unscaled number must be encoded as two's complement using 
big-endian byte order.
         Int128 value = buf_start[0] & 0x80 ? -1 : 0;
@@ -351,7 +395,6 @@ Status 
ByteArrayPlainDecoder::_decode_binary_decimal(MutableColumnPtr& doris_col
         }
         DecimalPrimitiveType cast_value(value);
         
column_data.emplace_back(*reinterpret_cast<Decimal<DecimalPrimitiveType>*>(&cast_value));
-        _offset += length;
     }
     return Status::OK();
 }
@@ -374,8 +417,6 @@ public:
     Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& 
data_type,
                          size_t num_values) override;
 
-    Status decode_values(Slice& slice, size_t num_values) override;
-
     Status skip_values(size_t num_values) override;
 
 protected:
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
index 778e7b1a66..a697fc5038 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
@@ -119,7 +119,43 @@ Status ColumnChunkReader::load_page_data() {
 Status ColumnChunkReader::_decode_dict_page() {
     const tparquet::PageHeader& header = *_page_reader->get_page_header();
     DCHECK_EQ(tparquet::PageType::DICTIONARY_PAGE, header.type);
-    // TODO(gaoxin): decode dictionary page
+
+    // Using the PLAIN_DICTIONARY enum value is deprecated in the Parquet 2.0 
specification.
+    // Prefer using RLE_DICTIONARY in a data page and PLAIN in a dictionary 
page for Parquet 2.0+ files.
+    // refer: https://github.com/apache/parquet-format/blob/master/Encodings.md
+    tparquet::Encoding::type dict_encoding = 
header.dictionary_page_header.encoding;
+    if (dict_encoding != tparquet::Encoding::PLAIN_DICTIONARY &&
+        dict_encoding != tparquet::Encoding::PLAIN) {
+        return Status::InternalError("Unsupported dictionary encoding {}",
+                                     tparquet::to_string(dict_encoding));
+    }
+
+    // Prepare dictionary data
+    int32_t uncompressed_size = header.uncompressed_page_size;
+    std::unique_ptr<uint8_t[]> dict_data(new uint8_t[uncompressed_size]);
+    if (_block_compress_codec != nullptr) {
+        Slice compressed_data;
+        RETURN_IF_ERROR(_page_reader->get_page_date(compressed_data));
+        Slice dict_slice(dict_data.get(), uncompressed_size);
+        RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data, 
&dict_slice));
+    } else {
+        Slice dict_slice;
+        RETURN_IF_ERROR(_page_reader->get_page_date(dict_slice));
+        // The data is stored by BufferedStreamReader, we should copy it out
+        memcpy(dict_data.get(), dict_slice.data, dict_slice.size);
+    }
+
+    // Cache page decoder
+    std::unique_ptr<Decoder> page_decoder;
+    Decoder::get_decoder(_metadata.type, tparquet::Encoding::RLE_DICTIONARY, 
page_decoder);
+    // Set type length
+    page_decoder->set_type_length(_get_type_length());
+    // Initialize the time convert context
+    page_decoder->init(_field_schema, _ctz);
+    // Set the dictionary data
+    RETURN_IF_ERROR(page_decoder->set_dict(dict_data, 
header.dictionary_page_header.num_values));
+    _decoders[static_cast<int>(tparquet::Encoding::RLE_DICTIONARY)] = 
std::move(page_decoder);
+
     return Status::OK();
 }
 
@@ -138,6 +174,16 @@ Status ColumnChunkReader::skip_values(size_t num_values) {
     return _page_decoder->skip_values(num_values);
 }
 
+void ColumnChunkReader::insert_null_values(ColumnPtr& doris_column, size_t 
num_values) {
+    DCHECK_GE(_remaining_num_values, num_values);
+    CHECK(doris_column->is_nullable());
+    auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
+            (*std::move(doris_column)).mutate().get());
+    MutableColumnPtr data_column = nullable_column->get_nested_column_ptr();
+    data_column->insert_default();
+    _remaining_num_values -= num_values;
+}
+
 size_t ColumnChunkReader::get_rep_levels(level_t* levels, size_t n) {
     DCHECK_GT(_max_rep_level, 0);
     return _rep_level_decoder.get_levels(levels, n);
@@ -166,14 +212,6 @@ Status ColumnChunkReader::decode_values(MutableColumnPtr& 
doris_column, DataType
     return _page_decoder->decode_values(doris_column, data_type, num_values);
 }
 
-Status ColumnChunkReader::decode_values(Slice& slice, size_t num_values) {
-    if (UNLIKELY(_remaining_num_values < num_values)) {
-        return Status::IOError("Decode too many values in current page");
-    }
-    _remaining_num_values -= num_values;
-    return _page_decoder->decode_values(slice, num_values);
-}
-
 int32_t ColumnChunkReader::_get_type_length() {
     switch (_field_schema->physical_type) {
     case tparquet::Type::INT32:
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
index bc3fcedbe1..79fdc204dc 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
@@ -87,7 +87,7 @@ public:
     uint32_t remaining_num_values() const { return _remaining_num_values; };
     // null values are generated from definition levels
     // the caller should maintain the consistency after analyzing null values 
from definition levels.
-    void dec_num_values(uint32_t dec_num) { _remaining_num_values -= dec_num; 
};
+    void insert_null_values(ColumnPtr& doris_column, size_t num_values);
     // Get the raw data of current page.
     Slice& get_page_data() { return _page_data; }
 
@@ -99,8 +99,6 @@ public:
     // Decode values in current page into doris column.
     Status decode_values(ColumnPtr& doris_column, DataTypePtr& data_type, 
size_t num_values);
     Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& 
data_type, size_t num_values);
-    // For test, Decode values in current page into slice.
-    Status decode_values(Slice& slice, size_t num_values);
 
     // Get the repetition level decoder of current page.
     LevelDecoder& rep_level_decoder() { return _rep_level_decoder; }
diff --git a/be/test/exec/test_data/parquet_scanner/dict-decoder.parquet 
b/be/test/exec/test_data/parquet_scanner/dict-decoder.parquet
new file mode 100644
index 0000000000..268d1d7135
Binary files /dev/null and 
b/be/test/exec/test_data/parquet_scanner/dict-decoder.parquet differ
diff --git a/be/test/exec/test_data/parquet_scanner/dict-decoder.txt 
b/be/test/exec/test_data/parquet_scanner/dict-decoder.txt
new file mode 100644
index 0000000000..ba0a3fba5a
--- /dev/null
+++ b/be/test/exec/test_data/parquet_scanner/dict-decoder.txt
@@ -0,0 +1,16 @@
++---------------------------+-----------------------------+------------------------+---------------------------+----------------------------+----------------------------+-----------------------------+----------------------------+----------------------------+---------------------------------+-------------------------------------+--------------------------+-----------------------------+------------------------+-----------------------------+--------------------------------------+
+|tinyint_col(Nullable(Int8))|smallint_col(Nullable(Int16))|int_col(Nullable(Int32))|bigint_col(Nullable(Int64))|boolean_col(Nullable(UInt8))|float_col(Nullable(Float32))|double_col(Nullable(Float64))|string_col(Nullable(String))|binary_col(Nullable(String))|timestamp_col(Nullable(DateTime))|decimal_col(Nullable(Decimal(27,
 
9)))|char_col(Nullable(String))|varchar_col(Nullable(String))|date_col(Nullable(Date))|date_v2_col(Nullable(DateV2))|timestamp_v2_col(Nullable(DateTimeV2))|
++---------------------------+-----------------------------+------------------------+---------------------------+----------------------------+----------------------------+-----------------------------+----------------------------+----------------------------+---------------------------------+-------------------------------------+--------------------------+-----------------------------+------------------------+-----------------------------+--------------------------------------+
+|                         -1|                           -1|                    
  -1|                         -1|                           0|                  
 -1.140000|                    -1.140000|                      s-row0|          
            b-row0|              2022-08-01 07:23:17|                         
-1.140000000|                    c-row0|                      vc-row0|          
    2022-08-01|                   2022-08-01|                   2022-08-01 
07:23:17|
+|                         -1|                           -1|                    
  -1|                         -1|                           0|                  
 -1.140000|                    -1.140000|                      s-row0|          
            b-row0|              2022-08-01 07:23:17|                         
-1.140000000|                    c-row0|                      vc-row0|          
    2022-08-01|                   2022-08-01|                   2022-08-01 
07:23:17|
+|                         -1|                           -1|                    
  -1|                         -1|                           0|                  
 -1.140000|                    -1.140000|                      s-row0|          
            b-row0|              2022-08-01 07:23:17|                         
-1.140000000|                    c-row0|                      vc-row0|          
    2022-08-01|                   2022-08-01|                   2022-08-01 
07:23:17|
+|                         -1|                           -1|                    
  -1|                         -1|                           0|                  
 -1.140000|                    -1.140000|                      s-row0|          
            b-row0|              2022-08-01 07:23:17|                         
-1.140000000|                    c-row0|                      vc-row0|          
    2022-08-01|                   2022-08-01|                   2022-08-01 
07:23:17|
+|                         -1|                           -1|                    
  -1|                         -1|                           0|                  
 -1.140000|                    -1.140000|                      s-row0|          
            b-row0|              2022-08-01 07:23:17|                         
-1.140000000|                    c-row0|                      vc-row0|          
    2022-08-01|                   2022-08-01|                   2022-08-01 
07:23:17|
+|                         -1|                           -1|                    
  -1|                         -1|                           0|                  
 -1.140000|                    -1.140000|                      s-row0|          
            b-row0|              2022-08-01 07:23:17|                         
-1.140000000|                    c-row0|                      vc-row0|          
    2022-08-01|                   2022-08-01|                   2022-08-01 
07:23:17|
+|                         -1|                           -1|                    
  -1|                         -1|                           0|                  
 -1.140000|                    -1.140000|                      s-row0|          
            b-row0|              2022-08-01 07:23:17|                         
-1.140000000|                    c-row0|                      vc-row0|          
    2022-08-01|                   2022-08-01|                   2022-08-01 
07:23:17|
+|                         -1|                           -1|                    
  -1|                         -1|                           0|                  
 -1.140000|                    -1.140000|                      s-row0|          
            b-row0|              2022-08-01 07:23:17|                         
-1.140000000|                    c-row0|                      vc-row0|          
    2022-08-01|                   2022-08-01|                   2022-08-01 
07:23:17|
+|                         -1|                           -1|                    
  -1|                         -1|                           0|                  
 -1.140000|                    -1.140000|                      s-row0|          
            b-row0|              2022-08-01 07:23:17|                         
-1.140000000|                    c-row0|                      vc-row0|          
    2022-08-01|                   2022-08-01|                   2022-08-01 
07:23:17|
+|                         -1|                           -1|                    
  -1|                         -1|                           0|                  
 -1.140000|                    -1.140000|                      s-row0|          
            b-row0|              2022-08-01 07:23:17|                         
-1.140000000|                    c-row0|                      vc-row0|          
    2022-08-01|                   2022-08-01|                   2022-08-01 
07:23:17|
+|                         -1|                           -1|                    
  -1|                         -1|                           0|                  
 -1.140000|                    -1.140000|                      s-row0|          
            b-row0|              2022-08-01 07:23:17|                         
-1.140000000|                    c-row0|                      vc-row0|          
    2022-08-01|                   2022-08-01|                   2022-08-01 
07:23:17|
+|                         -1|                           -1|                    
  -1|                         -1|                           0|                  
 -1.140000|                    -1.140000|                      s-row0|          
            b-row0|              2022-08-01 07:23:17|                         
-1.140000000|                    c-row0|                      vc-row0|          
    2022-08-01|                   2022-08-01|                   2022-08-01 
07:23:17|
++---------------------------+-----------------------------+------------------------+---------------------------+----------------------------+----------------------------+-----------------------------+----------------------------+----------------------------+---------------------------------+-------------------------------------+--------------------------+-----------------------------+------------------------+-----------------------------+--------------------------------------+
diff --git a/be/test/exec/test_data/parquet_scanner/type-decoder.txt 
b/be/test/exec/test_data/parquet_scanner/type-decoder.txt
new file mode 100644
index 0000000000..afe2b6e573
--- /dev/null
+++ b/be/test/exec/test_data/parquet_scanner/type-decoder.txt
@@ -0,0 +1,14 @@
++---------------------------+-----------------------------+------------------------+---------------------------+----------------------------+----------------------------+-----------------------------+----------------------------+----------------------------+---------------------------------+-------------------------------------+--------------------------+-----------------------------+------------------------+-----------------------------+--------------------------------------+
+|tinyint_col(Nullable(Int8))|smallint_col(Nullable(Int16))|int_col(Nullable(Int32))|bigint_col(Nullable(Int64))|boolean_col(Nullable(UInt8))|float_col(Nullable(Float32))|double_col(Nullable(Float64))|string_col(Nullable(String))|binary_col(Nullable(String))|timestamp_col(Nullable(DateTime))|decimal_col(Nullable(Decimal(27,
 
9)))|char_col(Nullable(String))|varchar_col(Nullable(String))|date_col(Nullable(Date))|date_v2_col(Nullable(DateV2))|timestamp_v2_col(Nullable(DateTimeV2))|
++---------------------------+-----------------------------+------------------------+---------------------------+----------------------------+----------------------------+-----------------------------+----------------------------+----------------------------+---------------------------------+-------------------------------------+--------------------------+-----------------------------+------------------------+-----------------------------+--------------------------------------+
+|                         -1|                           -1|                    
  -1|                         -1|                           0|                  
 -1.140000|                    -1.140000|                      s-row0|          
            b-row0|              2022-08-01 07:23:17|                         
-1.140000000|                    c-row0|                      vc-row0|          
    2022-08-01|                   2022-08-01|                   2022-08-01 
07:23:17|
+|                          2|                            2|                    
   2|                          2|                           1|                  
  2.140000|                     2.140000|                        NULL|          
            b-row1|              2022-08-02 07:23:18|                          
2.140000000|                    c-row1|                      vc-row1|           
   2022-08-02|                   2022-08-02|                   2022-08-02 
07:23:18|
+|                         -3|                           -3|                    
  -3|                         -3|                           0|                  
 -3.140000|                    -3.140000|                      s-row2|          
            b-row2|              2022-08-03 07:23:19|                         
-3.140000000|                    c-row2|                      vc-row2|          
    2022-08-03|                   2022-08-03|                   2022-08-03 
07:23:19|
+|                          4|                            4|                    
   4|                          4|                           1|                  
  4.140000|                     4.140000|                        NULL|          
            b-row3|              2022-08-04 07:24:17|                          
4.140000000|                    c-row3|                      vc-row3|           
   2022-08-04|                   2022-08-04|                   2022-08-04 
07:24:17|
+|                         -5|                           -5|                    
  -5|                         -5|                           0|                  
 -5.140000|                    -5.140000|                      s-row4|          
            b-row4|              2022-08-05 07:25:17|                         
-5.140000000|                    c-row4|                      vc-row4|          
    2022-08-05|                   2022-08-05|                   2022-08-05 
07:25:17|
+|                          6|                            6|                    
   6|                          6|                           0|                  
  6.140000|                     6.140000|                      s-row5|          
            b-row5|              2022-08-06 07:26:17|                          
6.140000000|                    c-row5|                      vc-row5|           
   2022-08-06|                   2022-08-06|                   2022-08-06 
07:26:17|
+|                         -7|                           -7|                    
  -7|                         -7|                           1|                  
 -7.140000|                    -7.140000|                      s-row6|          
            b-row6|              2022-08-07 07:27:17|                         
-7.140000000|                    c-row6|                      vc-row6|          
    2022-08-07|                   2022-08-07|                   2022-08-07 
07:27:17|
+|                          8|                            8|                    
   8|                          8|                           0|                  
  8.140000|                     8.140000|                        NULL|          
            b-row7|              2022-08-08 07:28:17|                          
8.140000000|                    c-row7|                      vc-row7|           
   2022-08-08|                   2022-08-08|                   2022-08-08 
07:28:17|
+|                         -9|                           -9|                    
  -9|                         -9|                           0|                  
 -9.140000|                    -9.140000|                      s-row8|          
            b-row8|              2022-08-09 07:29:17|                         
-9.140000000|                    c-row8|                      vc-row8|          
    2022-08-09|                   2022-08-09|                   2022-08-09 
07:29:17|
+|                         10|                           10|                    
  10|                         10|                           0|                  
 10.140000|                    10.140000|                      s-row9|          
            b-row9|              2022-08-10 07:21:17|                         
10.140000000|                    c-row9|                      vc-row9|          
    2022-08-10|                   2022-08-10|                   2022-08-10 
07:21:17|
++---------------------------+-----------------------------+------------------------+---------------------------+----------------------------+----------------------------+-----------------------------+----------------------------+----------------------------+---------------------------------+-------------------------------------+--------------------------+-----------------------------+------------------------+-----------------------------+--------------------------------------+
diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp 
b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
index c0f62067d3..75cc087d12 100644
--- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
@@ -131,9 +131,25 @@ TEST_F(ParquetThriftReaderTest, complex_nested_file) {
     ASSERT_EQ(schemaDescriptor.get_column_index("mark"), 4);
 }
 
+static int fill_nullable_column(ColumnPtr& doris_column, level_t* definitions, 
size_t num_values) {
+    CHECK(doris_column->is_nullable());
+    auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
+            (*std::move(doris_column)).mutate().get());
+    NullMap& map_data = nullable_column->get_null_map_data();
+    int null_cnt = 0;
+    for (int i = 0; i < num_values; ++i) {
+        bool nullable = definitions[i] == 0;
+        if (nullable) {
+            null_cnt++;
+        }
+        map_data.emplace_back(nullable);
+    }
+    return null_cnt;
+}
+
 static Status get_column_values(FileReader* file_reader, 
tparquet::ColumnChunk* column_chunk,
                                 FieldSchema* field_schema, ColumnPtr& 
doris_column,
-                                DataTypePtr& data_type) {
+                                DataTypePtr& data_type, level_t* definitions) {
     tparquet::ColumnMetaData chunk_meta = column_chunk->meta_data;
     size_t start_offset = chunk_meta.__isset.dictionary_page_offset
                                   ? chunk_meta.dictionary_page_offset
@@ -150,8 +166,46 @@ static Status get_column_values(FileReader* file_reader, 
tparquet::ColumnChunk*
     chunk_reader.next_page();
     // load page data into underlying container
     chunk_reader.load_page_data();
+    int rows = chunk_reader.remaining_num_values();
+    // definition levels
+    if (field_schema->definition_level == 0) { // required field
+        std::fill(definitions, definitions + rows, 1);
+    } else {
+        chunk_reader.get_def_levels(definitions, rows);
+    }
+    // fill nullable values
+    fill_nullable_column(doris_column, definitions, rows);
     // decode page data
-    return chunk_reader.decode_values(doris_column, data_type, 
chunk_reader.remaining_num_values());
+    if (field_schema->definition_level == 0) {
+        // required column
+        return chunk_reader.decode_values(doris_column, data_type, rows);
+    } else {
+        // column with null values
+        level_t level_type = definitions[0];
+        int num_values = 1;
+        for (int i = 1; i < rows; ++i) {
+            if (definitions[i] != level_type) {
+                if (level_type == 0) {
+                    // null values
+                    chunk_reader.insert_null_values(doris_column, num_values);
+                } else {
+                    RETURN_IF_ERROR(
+                            chunk_reader.decode_values(doris_column, 
data_type, num_values));
+                }
+                level_type = definitions[i];
+                num_values = 1;
+            } else {
+                num_values++;
+            }
+        }
+        if (level_type == 0) {
+            // null values
+            chunk_reader.insert_null_values(doris_column, num_values);
+        } else {
+            RETURN_IF_ERROR(chunk_reader.decode_values(doris_column, 
data_type, num_values));
+        }
+        return Status::OK();
+    }
 }
 
 static void create_block(std::unique_ptr<vectorized::Block>& block) {
@@ -192,10 +246,11 @@ static void 
create_block(std::unique_ptr<vectorized::Block>& block) {
     }
 }
 
-TEST_F(ParquetThriftReaderTest, type_decoder) {
+static void read_parquet_data_and_check(const std::string& parquet_file,
+                                        const std::string& result_file, int 
rows) {
     /*
-     * type-decoder.parquet is the part of following table:
-     * create table `type_decoder`(
+     * table schema in parquet file:
+     * create table `decoder`(
      * `tinyint_col` tinyint, // 0
      * `smallint_col` smallint, // 1
      * `int_col` int, // 2
@@ -213,20 +268,7 @@ TEST_F(ParquetThriftReaderTest, type_decoder) {
      * `list_string` array<string>) // 14
      */
 
-    LocalFileReader 
reader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", 0);
-    /*
-     * Data in type-decoder.parquet:
-     * -1      -1      -1      -1      false   -1.14   -1.14   s-row0  b-row0  
2022-08-01 07:23:17     -1.14   c-row0          vc-row0 2022-08-01      
["as-0","as-1"]
-     * 2       2       2       2       true    2.14    2.14    NULL    b-row1  
2022-08-02 07:23:18     2.14    c-row1          vc-row1 2022-08-02      
[null,"as-3"]
-     * -3      -3      -3      -3      false   -3.14   -3.14   s-row2  b-row2  
2022-08-03 07:23:19     -3.14   c-row2          vc-row2 2022-08-03      []
-     * 4       4       4       4       true    4.14    4.14    NULL    b-row3  
2022-08-04 07:24:17     4.14    c-row3          vc-row3 2022-08-04      ["as-4"]
-     * -5      -5      -5      -5      false   -5.14   -5.14   s-row4  b-row4  
2022-08-05 07:25:17     -5.14   c-row4          vc-row4 2022-08-05      
["as-5",null]
-     * 6       6       6       6       false   6.14    6.14    s-row5  b-row5  
2022-08-06 07:26:17     6.14    c-row5          vc-row5 2022-08-06      
[null,null]
-     * -7      -7      -7      -7      true    -7.14   -7.14   s-row6  b-row6  
2022-08-07 07:27:17     -7.14   c-row6          vc-row6 2022-08-07      
["as-6","as-7"]
-     * 8       8       8       8       false   8.14    8.14    NULL    b-row7  
2022-08-08 07:28:17     8.14    c-row7          vc-row7 2022-08-08      
["as-0","as-8"]
-     * -9      -9      -9      -9      false   -9.14   -9.14   s-row8  b-row8  
2022-08-09 07:29:17     -9.14   c-row8          vc-row8 2022-08-09      
["as-9","as-10"]
-     * 10      10      10      10      false   10.14   10.14   s-row9  b-row9  
2022-08-10 07:21:17     10.14   c-row9          vc-row9 2022-08-10      
["as-11","as-12"]
-     */
+    LocalFileReader reader(parquet_file, 0);
     auto st = reader.open();
     EXPECT_TRUE(st.ok());
 
@@ -237,194 +279,15 @@ TEST_F(ParquetThriftReaderTest, type_decoder) {
     tparquet::FileMetaData t_metadata = metaData->to_thrift_metadata();
     FieldDescriptor schema_descriptor;
     schema_descriptor.parse_from_thrift(t_metadata.schema);
-    int rows = 10;
+    level_t defs[rows];
 
-    // the physical_type of tinyint_col, smallint_col and int_col are all INT32
-    // they are distinguished by converted_type(in 
FieldSchema.parquet_schema.converted_type)
-    {
-        auto& column_name_with_type = block->get_by_position(0);
-        auto& data_column = column_name_with_type.column;
-        auto& data_type = column_name_with_type.type;
-        get_column_values(&reader, &t_metadata.row_groups[0].columns[0],
-                          
const_cast<FieldSchema*>(schema_descriptor.get_column(0)), data_column,
-                          data_type);
-        int int_sum = 0;
-        for (int i = 0; i < rows; ++i) {
-            int_sum += (int8_t)data_column->get64(i);
-        }
-        ASSERT_EQ(int_sum, 5);
-    }
-    {
-        auto& column_name_with_type = block->get_by_position(1);
-        auto& data_column = column_name_with_type.column;
-        auto& data_type = column_name_with_type.type;
-        get_column_values(&reader, &t_metadata.row_groups[0].columns[1],
-                          
const_cast<FieldSchema*>(schema_descriptor.get_column(1)), data_column,
-                          data_type);
-        int int_sum = 0;
-        for (int i = 0; i < rows; ++i) {
-            int_sum += (int16_t)data_column->get64(i);
-        }
-        ASSERT_EQ(int_sum, 5);
-    }
-    {
-        auto& column_name_with_type = block->get_by_position(2);
-        auto& data_column = column_name_with_type.column;
-        auto& data_type = column_name_with_type.type;
-        get_column_values(&reader, &t_metadata.row_groups[0].columns[2],
-                          
const_cast<FieldSchema*>(schema_descriptor.get_column(2)), data_column,
-                          data_type);
-        int int_sum = 0;
-        for (int i = 0; i < rows; ++i) {
-            int_sum += (int32_t)data_column->get64(i);
-        }
-        ASSERT_EQ(int_sum, 5);
-    }
-    {
-        auto& column_name_with_type = block->get_by_position(3);
-        auto& data_column = column_name_with_type.column;
-        auto& data_type = column_name_with_type.type;
-        get_column_values(&reader, &t_metadata.row_groups[0].columns[3],
-                          
const_cast<FieldSchema*>(schema_descriptor.get_column(3)), data_column,
-                          data_type);
-        int64_t int_sum = 0;
-        for (int i = 0; i < rows; ++i) {
-            int_sum += (int64_t)data_column->get64(i);
-        }
-        ASSERT_EQ(int_sum, 5);
-    }
-    // `boolean_col` boolean, // 4
-    {
-        auto& column_name_with_type = block->get_by_position(4);
+    for (int c = 0; c < 14; ++c) {
+        auto& column_name_with_type = block->get_by_position(c);
         auto& data_column = column_name_with_type.column;
         auto& data_type = column_name_with_type.type;
-        get_column_values(&reader, &t_metadata.row_groups[0].columns[4],
-                          
const_cast<FieldSchema*>(schema_descriptor.get_column(4)), data_column,
-                          data_type);
-        ASSERT_FALSE(static_cast<bool>(data_column->get64(0)));
-        ASSERT_TRUE(static_cast<bool>(data_column->get64(1)));
-        ASSERT_FALSE(static_cast<bool>(data_column->get64(2)));
-        ASSERT_TRUE(static_cast<bool>(data_column->get64(3)));
-        ASSERT_FALSE(static_cast<bool>(data_column->get64(4)));
-        ASSERT_FALSE(static_cast<bool>(data_column->get64(5)));
-        ASSERT_TRUE(static_cast<bool>(data_column->get64(6)));
-        ASSERT_FALSE(static_cast<bool>(data_column->get64(7)));
-        ASSERT_FALSE(static_cast<bool>(data_column->get64(8)));
-        ASSERT_FALSE(static_cast<bool>(data_column->get64(9)));
-    }
-    // `double_col` double, // 6
-    {
-        auto& column_name_with_type = block->get_by_position(6);
-        auto& data_column = column_name_with_type.column;
-        auto& data_type = column_name_with_type.type;
-        get_column_values(&reader, &t_metadata.row_groups[0].columns[6],
-                          
const_cast<FieldSchema*>(schema_descriptor.get_column(6)), data_column,
-                          data_type);
-        auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
-                (*std::move(data_column)).mutate().get());
-        MutableColumnPtr nested_column = 
nullable_column->get_nested_column_ptr();
-        ASSERT_EQ(nested_column->get_float64(0), -1.14);
-        ASSERT_EQ(nested_column->get_float64(1), 2.14);
-        ASSERT_EQ(nested_column->get_float64(2), -3.14);
-        ASSERT_EQ(nested_column->get_float64(3), 4.14);
-    }
-    // `string_col` string, // 7
-    {
-        auto& column_name_with_type = block->get_by_position(7);
-        auto& data_column = column_name_with_type.column;
-        auto& data_type = column_name_with_type.type;
-        tparquet::ColumnChunk column_chunk = 
t_metadata.row_groups[0].columns[7];
-        tparquet::ColumnMetaData chunk_meta = column_chunk.meta_data;
-        size_t start_offset = chunk_meta.__isset.dictionary_page_offset
-                                      ? chunk_meta.dictionary_page_offset
-                                      : chunk_meta.data_page_offset;
-        size_t chunk_size = chunk_meta.total_compressed_size;
-        BufferedFileStreamReader stream_reader(&reader, start_offset, 
chunk_size);
-        cctz::time_zone ctz;
-        TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, 
ctz);
-        ColumnChunkReader chunk_reader(&stream_reader, &column_chunk,
-                                       
const_cast<FieldSchema*>(schema_descriptor.get_column(7)),
-                                       &ctz);
-        // initialize chunk reader
-        chunk_reader.init();
-        // seek to next page header
-        chunk_reader.next_page();
-        // load page data into underlying container
-        chunk_reader.load_page_data();
-
-        level_t defs[rows];
-        // Analyze null string
-        chunk_reader.get_def_levels(defs, rows);
-        ASSERT_EQ(defs[1], 0);
-        ASSERT_EQ(defs[3], 0);
-        ASSERT_EQ(defs[7], 0);
-
-        chunk_reader.decode_values(data_column, data_type, 7);
-        auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
-                (*std::move(data_column)).mutate().get());
-        MutableColumnPtr nested_column = 
nullable_column->get_nested_column_ptr();
-        auto row0 = nested_column->get_data_at(0).data;
-        auto row2 = nested_column->get_data_at(1).data;
-        ASSERT_STREQ("s-row0", row0);
-        ASSERT_STREQ("s-row2", row2);
-    }
-    // `timestamp_col` timestamp, // 9, DATETIME
-    {
-        auto& column_name_with_type = block->get_by_position(9);
-        auto& data_column = column_name_with_type.column;
-        auto& data_type = column_name_with_type.type;
-        get_column_values(&reader, &t_metadata.row_groups[0].columns[9],
-                          
const_cast<FieldSchema*>(schema_descriptor.get_column(9)), data_column,
-                          data_type);
-        auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
-                (*std::move(data_column)).mutate().get());
-        MutableColumnPtr nested_column = 
nullable_column->get_nested_column_ptr();
-        int64_t date_value = (int64_t)nested_column->get64(0);
-        VecDateTimeInt64Union conv = {.i64 = date_value};
-        auto dt = conv.dt;
-        ASSERT_EQ(dt.hour(), 7);
-        ASSERT_EQ(dt.minute(), 23);
-        ASSERT_EQ(dt.second(), 17);
-    }
-    // `decimal_col` decimal, // 10
-    {
-        auto& column_name_with_type = block->get_by_position(10);
-        auto& data_column = column_name_with_type.column;
-        auto& data_type = column_name_with_type.type;
-        get_column_values(&reader, &t_metadata.row_groups[0].columns[10],
-                          
const_cast<FieldSchema*>(schema_descriptor.get_column(10)), data_column,
-                          data_type);
-        auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
-                (*std::move(data_column)).mutate().get());
-        MutableColumnPtr nested_column = 
nullable_column->get_nested_column_ptr();
-        int neg = 1;
-        for (int i = 0; i < rows; ++i) {
-            neg *= -1;
-            auto decimal_field = nested_column->operator[](i)
-                                         
.get<vectorized::DecimalField<vectorized::Decimal128>>();
-            EXPECT_EQ(DecimalV2Value(decimal_field.get_value()),
-                      DecimalV2Value(std::to_string(neg * (1.14 + i))));
-        }
-    }
-    // `date_col` date, // 13, DATE
-    {
-        auto& column_name_with_type = block->get_by_position(13);
-        auto& data_column = column_name_with_type.column;
-        auto& data_type = column_name_with_type.type;
-        get_column_values(&reader, &t_metadata.row_groups[0].columns[13],
-                          
const_cast<FieldSchema*>(schema_descriptor.get_column(13)), data_column,
-                          data_type);
-        auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
-                (*std::move(data_column)).mutate().get());
-        MutableColumnPtr nested_column = 
nullable_column->get_nested_column_ptr();
-        for (int i = 0; i < rows; ++i) {
-            int64_t date_value = (int64_t)nested_column->get64(i);
-            VecDateTimeInt64Union conv = {.i64 = date_value};
-            auto dt = conv.dt;
-            ASSERT_EQ(dt.year(), 2022);
-            ASSERT_EQ(dt.month(), 8);
-            ASSERT_EQ(dt.day(), i + 1);
-        }
+        get_column_values(&reader, &t_metadata.row_groups[0].columns[c],
+                          
const_cast<FieldSchema*>(schema_descriptor.get_column(c)), data_column,
+                          data_type, defs);
     }
     // `date_v2_col` date, // 14 - 13, DATEV2
     {
@@ -433,18 +296,7 @@ TEST_F(ParquetThriftReaderTest, type_decoder) {
         auto& data_type = column_name_with_type.type;
         get_column_values(&reader, &t_metadata.row_groups[0].columns[13],
                           
const_cast<FieldSchema*>(schema_descriptor.get_column(13)), data_column,
-                          data_type);
-        auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
-                (*std::move(data_column)).mutate().get());
-        MutableColumnPtr nested_column = 
nullable_column->get_nested_column_ptr();
-        for (int i = 0; i < rows; ++i) {
-            uint32_t date_value = (uint32_t)nested_column->get64(i);
-            DateV2UInt32Union conv = {.ui32 = date_value};
-            auto dt = conv.dt;
-            ASSERT_EQ(dt.year(), 2022);
-            ASSERT_EQ(dt.month(), 8);
-            ASSERT_EQ(dt.day(), i + 1);
-        }
+                          data_type, defs);
     }
     // `timestamp_v2_col` timestamp, // 15 - 9, DATETIMEV2
     {
@@ -453,17 +305,28 @@ TEST_F(ParquetThriftReaderTest, type_decoder) {
         auto& data_type = column_name_with_type.type;
         get_column_values(&reader, &t_metadata.row_groups[0].columns[9],
                           
const_cast<FieldSchema*>(schema_descriptor.get_column(9)), data_column,
-                          data_type);
-        auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
-                (*std::move(data_column)).mutate().get());
-        MutableColumnPtr nested_column = 
nullable_column->get_nested_column_ptr();
-        uint64_t date_value = nested_column->get64(0);
-        DateTimeV2UInt64Union conv = {.ui64 = date_value};
-        auto dt = conv.dt;
-        ASSERT_EQ(dt.hour(), 7);
-        ASSERT_EQ(dt.minute(), 23);
-        ASSERT_EQ(dt.second(), 17);
+                          data_type, defs);
     }
+
+    LocalFileReader result(result_file, 0);
+    auto rst = result.open();
+    EXPECT_TRUE(rst.ok());
+    uint8_t result_buf[result.size() + 1];
+    result_buf[result.size()] = '\0';
+    int64_t bytes_read;
+    bool eof;
+    result.read(result_buf, result.size(), &bytes_read, &eof);
+    ASSERT_STREQ(block->dump_data(0, rows).c_str(), 
reinterpret_cast<char*>(result_buf));
+}
+
+TEST_F(ParquetThriftReaderTest, type_decoder) {
+    
read_parquet_data_and_check("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet",
+                                
"./be/test/exec/test_data/parquet_scanner/type-decoder.txt", 10);
+}
+
+TEST_F(ParquetThriftReaderTest, dict_decoder) {
+    
read_parquet_data_and_check("./be/test/exec/test_data/parquet_scanner/dict-decoder.parquet",
+                                
"./be/test/exec/test_data/parquet_scanner/dict-decoder.txt", 12);
 }
 
 TEST_F(ParquetThriftReaderTest, column_reader) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to