This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f5aed27ec [feature-wip](parquet-reader)read and decode parquet 
physical type (#11637)
8f5aed27ec is described below

commit 8f5aed27ecdf01028393c0888acc8e1de3b1eeb1
Author: Ashin Gau <ashin...@users.noreply.github.com>
AuthorDate: Thu Aug 11 10:17:32 2022 +0800

    [feature-wip](parquet-reader)read and decode parquet physical type (#11637)
    
    # Proposed changes
    
    Read and decode parquet physical type.
    1. The encoding type of boolean is bit-packing, this PR introduces the 
implementation of bit-packing from Impala
    2. Create a parquet including all the primitive types supported by hive
    
    ## Remaining Problems
    1. At present, only physical types are decoded, and there is no 
corresponding and conversion methods with doris logical.
    2. No parsing and processing Decimal type / Timestamp / Date.
    3. Int_8 / Int_16 is stored as Int_32. How to resolve these types.
---
 be/src/util/bit_packing.h                          | 140 ++++++++
 be/src/util/bit_packing.inline.h                   | 380 +++++++++++++++++++++
 be/src/util/bit_stream_utils.h                     |  99 ++++++
 be/src/util/bit_stream_utils.inline.h              | 101 ++++++
 be/src/util/bit_util.h                             |   8 +
 be/src/vec/exec/format/parquet/parquet_common.cpp  | 163 ++++++++-
 be/src/vec/exec/format/parquet/parquet_common.h    | 115 ++++++-
 .../parquet/vparquet_column_chunk_reader.cpp       |  14 +-
 .../format/parquet/vparquet_column_chunk_reader.h  |   5 +-
 .../test_data/parquet_scanner/type-decoder.parquet | Bin 338 -> 3405 bytes
 be/test/vec/exec/parquet/parquet_thrift_test.cpp   | 165 ++++++---
 11 files changed, 1128 insertions(+), 62 deletions(-)

diff --git a/be/src/util/bit_packing.h b/be/src/util/bit_packing.h
new file mode 100644
index 0000000000..61a389512a
--- /dev/null
+++ b/be/src/util/bit_packing.h
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// the implement of BitPacking is from impala
+
+#pragma once
+
+#include "util/bit_util.h"
+
+namespace doris {
+
+/// Utilities for manipulating bit-packed values. Bit-packing is a technique 
for
+/// compressing integer values that do not use the full range of the integer 
type.
+/// E.g. an array of uint32_t values with range [0, 31] only uses the lower 5 
bits
+/// of every uint32_t value, or an array of 0/1 booleans only uses the lowest 
bit
+/// of each integer.
+///
+/// Bit-packing always has a "bit width" parameter that determines the range of
+/// representable unsigned values: [0, 2^bit_width - 1]. The packed 
representation
+/// is logically the concatenatation of the lower bits of the input values (in
+/// little-endian order). E.g. the values 1, 2, 3, 4 packed with bit width 4 
results
+/// in the two output bytes: [ 0 0 1 0 | 0 0 0 1 ] [ 0 1 0 0 | 0 0 1 1 ]
+///                               2         1           4         3
+///
+/// Packed values can be split across words, e.g. packing 1, 17 with bit_width 
5 results
+/// in the two output bytes: [ 0 0 1 | 0 0 0 0 1 ] [ x x x x x x | 1 0 ]
+///            lower bits of 17--^         1         next value     ^--upper 
bits of 17
+///
+/// Bit widths from 0 to 64 are supported (0 bit width means that every value 
is 0).
+/// The batched unpacking functions operate on batches of 32 values. This 
batch size
+/// is convenient because for every supported bit width, the end of a 32 value 
batch
+/// falls on a byte boundary. It is also large enough to amortise loop 
overheads.
+class BitPacking {
+public:
+    static constexpr int MAX_BITWIDTH = sizeof(uint64_t) * 8;
+    static constexpr int MAX_DICT_BITWIDTH = sizeof(uint32_t) * 8;
+
+    /// Unpack bit-packed values with 'bit_width' from 'in' to 'out'. Keeps 
unpacking until
+    /// either all 'in_bytes' are read or 'num_values' values are unpacked. 
'out' must have
+    /// enough space for 'num_values'. 0 <= 'bit_width' <= 64 and 'bit_width' 
<= # of bits
+    /// in OutType. 'in' must point to 'in_bytes' of addressable memory.
+    ///
+    /// Returns a pointer to the byte after the last byte of 'in' that was 
read and also the
+    /// number of values that were read. If the caller wants to continue 
reading packed
+    /// values after the last one returned, it must ensure that the next value 
to unpack
+    /// starts at a byte boundary. This is true if 'num_values' is a multiple 
of 32, or
+    /// more generally if (bit_width * num_values) % 8 == 0.
+    template <typename OutType>
+    static std::pair<const uint8_t*, int64_t> UnpackValues(int bit_width,
+                                                           const uint8_t* 
__restrict__ in,
+                                                           int64_t in_bytes, 
int64_t num_values,
+                                                           OutType* 
__restrict__ out);
+
+    /// Same as above, templated by BIT_WIDTH.
+    template <typename OutType, int BIT_WIDTH>
+    static std::pair<const uint8_t*, int64_t> UnpackValues(const uint8_t* 
__restrict__ in,
+                                                           int64_t in_bytes, 
int64_t num_values,
+                                                           OutType* 
__restrict__ out);
+
+    /// Unpack values as above, treating them as unsigned integers, and decode 
them
+    /// using the provided dict. Writes them to 'out' with a stride of 
'stride' bytes.
+    /// Sets 'decode_error' to true if one of the packed values was greater 
than 'dict_len'.
+    /// Does not modify 'decode_error' on success.
+    template <typename OutType>
+    static std::pair<const uint8_t*, int64_t> UnpackAndDecodeValues(
+            int bit_width, const uint8_t* __restrict__ in, int64_t in_bytes,
+            OutType* __restrict__ dict, int64_t dict_len, int64_t num_values,
+            OutType* __restrict__ out, int64_t stride, bool* __restrict__ 
decode_error);
+
+    /// Same as above, templated by BIT_WIDTH.
+    template <typename OutType, int BIT_WIDTH>
+    static std::pair<const uint8_t*, int64_t> UnpackAndDecodeValues(
+            const uint8_t* __restrict__ in, int64_t in_bytes, OutType* 
__restrict__ dict,
+            int64_t dict_len, int64_t num_values, OutType* __restrict__ out, 
int64_t stride,
+            bool* __restrict__ decode_error);
+
+    /// Unpack exactly 32 values of 'bit_width' from 'in' to 'out'. 'in' must 
point to
+    /// 'in_bytes' of addressable memory, and 'in_bytes' must be at least
+    /// (32 * bit_width / 8). 'out' must have space for 32 OutType values.
+    /// 0 <= 'bit_width' <= 64 and 'bit_width' <= # of bits in OutType.
+    template <typename OutType>
+    static const uint8_t* Unpack32Values(int bit_width, const uint8_t* 
__restrict__ in,
+                                         int64_t in_bytes, OutType* 
__restrict__ out);
+
+    /// Same as Unpack32Values() but templated by BIT_WIDTH.
+    template <typename OutType, int BIT_WIDTH>
+    static const uint8_t* Unpack32Values(const uint8_t* __restrict__ in, 
int64_t in_bytes,
+                                         OutType* __restrict__ out);
+
+    /// Same as Unpack32Values() with dictionary decoding.
+    template <typename OutType>
+    static const uint8_t* UnpackAndDecode32Values(int bit_width, const 
uint8_t* __restrict__ in,
+                                                  int64_t in_bytes, OutType* 
__restrict__ dict,
+                                                  int64_t dict_len, OutType* 
__restrict__ out,
+                                                  int64_t stride, bool* 
__restrict__ decode_error);
+
+    /// Same as UnpackAndDecode32Values() but templated by BIT_WIDTH.
+    template <typename OutType, int BIT_WIDTH>
+    static const uint8_t* UnpackAndDecode32Values(const uint8_t* __restrict__ 
in, int64_t in_bytes,
+                                                  OutType* __restrict__ dict, 
int64_t dict_len,
+                                                  OutType* __restrict__ out, 
int64_t stride,
+                                                  bool* __restrict__ 
decode_error);
+
+    /// Unpacks 'num_values' values with the given BIT_WIDTH from 'in' to 
'out'.
+    /// 'num_values' must be at most 31. 'in' must point to 'in_bytes' of 
addressable
+    /// memory, and 'in_bytes' must be at least ceil(num_values * bit_width / 
8).
+    /// 'out' must have space for 'num_values' OutType values.
+    /// 0 <= 'bit_width' <= 64 and 'bit_width' <= # of bits in OutType.
+    template <typename OutType, int BIT_WIDTH>
+    static const uint8_t* UnpackUpTo31Values(const uint8_t* __restrict__ in, 
int64_t in_bytes,
+                                             int num_values, OutType* 
__restrict__ out);
+
+    /// Same as UnpackUpTo31Values() with dictionary decoding.
+    template <typename OutType, int BIT_WIDTH>
+    static const uint8_t* UnpackAndDecodeUpTo31Values(const uint8_t* 
__restrict__ in,
+                                                      int64_t in_bytes, 
OutType* __restrict__ dict,
+                                                      int64_t dict_len, int 
num_values,
+                                                      OutType* __restrict__ 
out, int64_t stride,
+                                                      bool* __restrict__ 
decode_error);
+
+private:
+    /// Compute the number of values with the given bit width that can be 
unpacked from
+    /// an input buffer of 'in_bytes' into an output buffer with space for 
'num_values'.
+    static int64_t NumValuesToUnpack(int bit_width, int64_t in_bytes, int64_t 
num_values);
+};
+} // namespace doris
diff --git a/be/src/util/bit_packing.inline.h b/be/src/util/bit_packing.inline.h
new file mode 100644
index 0000000000..51efbb125d
--- /dev/null
+++ b/be/src/util/bit_packing.inline.h
@@ -0,0 +1,380 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// the implement of BitPacking is from impala
+
+#include <boost/preprocessor/repetition/repeat_from_to.hpp>
+
+#include "util/bit_packing.h"
+
+namespace doris {
+
+inline int64_t BitPacking::NumValuesToUnpack(int bit_width, int64_t in_bytes, 
int64_t num_values) {
+    // Check if we have enough input bytes to decode 'num_values'.
+    if (bit_width == 0 || BitUtil::RoundUpNumBytes(num_values * bit_width) <= 
in_bytes) {
+        // Limited by output space.
+        return num_values;
+    } else {
+        // Limited by the number of input bytes. Compute the number of values 
that can be
+        // unpacked from the input.
+        return (in_bytes * CHAR_BIT) / bit_width;
+    }
+}
+
+template <typename T>
+constexpr bool IsSupportedUnpackingType() {
+    return std::is_same<T, uint8_t>::value || std::is_same<T, uint16_t>::value 
||
+           std::is_same<T, uint32_t>::value || std::is_same<T, 
uint64_t>::value;
+}
+
+template <typename OutType>
+std::pair<const uint8_t*, int64_t> BitPacking::UnpackValues(int bit_width,
+                                                            const uint8_t* 
__restrict__ in,
+                                                            int64_t in_bytes, 
int64_t num_values,
+                                                            OutType* 
__restrict__ out) {
+    static_assert(IsSupportedUnpackingType<OutType>(), "Only unsigned integers 
are supported.");
+
+#pragma push_macro("UNPACK_VALUES_CASE")
+#define UNPACK_VALUES_CASE(ignore1, i, ignore2) \
+    case i:                                     \
+        return UnpackValues<OutType, i>(in, in_bytes, num_values, out);
+
+    switch (bit_width) {
+        // Expand cases from 0 to 64.
+        BOOST_PP_REPEAT_FROM_TO(0, 65, UNPACK_VALUES_CASE, ignore);
+    default:
+        DCHECK(false);
+        return std::make_pair(nullptr, -1);
+    }
+#pragma pop_macro("UNPACK_VALUES_CASE")
+}
+
+template <typename OutType, int BIT_WIDTH>
+std::pair<const uint8_t*, int64_t> BitPacking::UnpackValues(const uint8_t* 
__restrict__ in,
+                                                            int64_t in_bytes, 
int64_t num_values,
+                                                            OutType* 
__restrict__ out) {
+    static_assert(IsSupportedUnpackingType<OutType>(), "Only unsigned integers 
are supported.");
+
+    constexpr int BATCH_SIZE = 32;
+    const int64_t values_to_read = NumValuesToUnpack(BIT_WIDTH, in_bytes, 
num_values);
+    const int64_t batches_to_read = values_to_read / BATCH_SIZE;
+    const int64_t remainder_values = values_to_read % BATCH_SIZE;
+    const uint8_t* in_pos = in;
+    OutType* out_pos = out;
+
+    // First unpack as many full batches as possible.
+    for (int64_t i = 0; i < batches_to_read; ++i) {
+        in_pos = Unpack32Values<OutType, BIT_WIDTH>(in_pos, in_bytes, out_pos);
+        out_pos += BATCH_SIZE;
+        in_bytes -= (BATCH_SIZE * BIT_WIDTH) / CHAR_BIT;
+    }
+
+    // Then unpack the final partial batch.
+    if (remainder_values > 0) {
+        in_pos =
+                UnpackUpTo31Values<OutType, BIT_WIDTH>(in_pos, in_bytes, 
remainder_values, out_pos);
+    }
+    return std::make_pair(in_pos, values_to_read);
+}
+
+template <typename OutType>
+std::pair<const uint8_t*, int64_t> BitPacking::UnpackAndDecodeValues(
+        int bit_width, const uint8_t* __restrict__ in, int64_t in_bytes, 
OutType* __restrict__ dict,
+        int64_t dict_len, int64_t num_values, OutType* __restrict__ out, 
int64_t stride,
+        bool* __restrict__ decode_error) {
+#pragma push_macro("UNPACK_VALUES_CASE")
+#define UNPACK_VALUES_CASE(ignore1, i, ignore2)                                
                 \
+    case i:                                                                    
                 \
+        return UnpackAndDecodeValues<OutType, i>(in, in_bytes, dict, dict_len, 
num_values, out, \
+                                                 stride, decode_error);
+
+    switch (bit_width) {
+        // Expand cases from 0 to MAX_DICT_BITWIDTH.
+        BOOST_PP_REPEAT_FROM_TO(0, 33, UNPACK_VALUES_CASE, ignore);
+    default:
+        DCHECK(false);
+        return std::make_pair(nullptr, -1);
+    }
+#pragma pop_macro("UNPACK_VALUES_CASE")
+}
+template <typename OutType, int BIT_WIDTH>
+std::pair<const uint8_t*, int64_t> BitPacking::UnpackAndDecodeValues(
+        const uint8_t* __restrict__ in, int64_t in_bytes, OutType* 
__restrict__ dict,
+        int64_t dict_len, int64_t num_values, OutType* __restrict__ out, 
int64_t stride,
+        bool* __restrict__ decode_error) {
+    constexpr int BATCH_SIZE = 32;
+    const int64_t values_to_read = NumValuesToUnpack(BIT_WIDTH, in_bytes, 
num_values);
+    const int64_t batches_to_read = values_to_read / BATCH_SIZE;
+    const int64_t remainder_values = values_to_read % BATCH_SIZE;
+    const uint8_t* in_pos = in;
+    uint8_t* out_pos = reinterpret_cast<uint8_t*>(out);
+    // First unpack as many full batches as possible.
+    for (int64_t i = 0; i < batches_to_read; ++i) {
+        in_pos = UnpackAndDecode32Values<OutType, BIT_WIDTH>(in_pos, in_bytes, 
dict, dict_len,
+                                                             
reinterpret_cast<OutType*>(out_pos),
+                                                             stride, 
decode_error);
+        out_pos += stride * BATCH_SIZE;
+        in_bytes -= (BATCH_SIZE * BIT_WIDTH) / CHAR_BIT;
+    }
+    // Then unpack the final partial batch.
+    if (remainder_values > 0) {
+        in_pos = UnpackAndDecodeUpTo31Values<OutType, BIT_WIDTH>(
+                in_pos, in_bytes, dict, dict_len, remainder_values,
+                reinterpret_cast<OutType*>(out_pos), stride, decode_error);
+    }
+    return std::make_pair(in_pos, values_to_read);
+}
+
+// Loop body of unrolled loop that unpacks the value. BIT_WIDTH is the bit 
width of
+// the packed values. 'in_buf' is the start of the input buffer and 'out_vals' 
is the
+// start of the output values array. This function unpacks the VALUE_IDX'th 
packed value
+// from 'in_buf'.
+//
+// This implements essentially the same algorithm as the (Apache-licensed) 
code in
+// bpacking.c at https://github.com/lemire/FrameOfReference/, but is much more 
compact
+// because it uses templates rather than source-level unrolling of all 
combinations.
+//
+// After the template parameters are expanded and constants are propagated, 
all branches
+// and offset/shift calculations should be optimized out, leaving only shifts 
by constants
+// and bitmasks by constants. Calls to this must be stamped out manually or 
with
+// BOOST_PP_REPEAT_FROM_TO: experimentation revealed that the GCC 4.9.2 
optimiser was
+// not able to fully propagate constants and remove branches when this was 
called from
+// inside a for loop with constant bounds with VALUE_IDX changed to a function 
argument.
+//
+// We compute how many 32 bit words we have to read, which is either 1, 2 or 
3. If it is
+// at least 2, the first two 32 bit words are read as one 64 bit word. Even if 
only one
+// word needs to be read, we try to read 64 bits if it does not lead to buffer 
overflow
+// because benchmarks show that it has a positive effect on performance.
+//
+// If 'FULL_BATCH' is true, this function call is part of unpacking 32 values, 
otherwise
+// up to 31 values. This is needed to optimise the length of the reads (32 or 
64 bits) and
+// avoid buffer overflow (if we are unpacking 32 values, we can safely assume 
an input
+// buffer of length 32 * BIT_WIDTH).
+template <int BIT_WIDTH, int VALUE_IDX, bool FULL_BATCH>
+inline uint64_t ALWAYS_INLINE UnpackValue(const uint8_t* __restrict__ in_buf) {
+    if (BIT_WIDTH == 0) return 0;
+
+    constexpr int FIRST_BIT_IDX = VALUE_IDX * BIT_WIDTH;
+    constexpr int FIRST_WORD_IDX = FIRST_BIT_IDX / 32;
+    constexpr int LAST_BIT_IDX = FIRST_BIT_IDX + BIT_WIDTH;
+    constexpr int LAST_WORD_IDX = BitUtil::round_up_numi32(LAST_BIT_IDX);
+    constexpr int WORDS_TO_READ = LAST_WORD_IDX - FIRST_WORD_IDX;
+    static_assert(WORDS_TO_READ <= 3, "At most three 32-bit words need to be 
loaded.");
+
+    constexpr int FIRST_BIT_OFFSET = FIRST_BIT_IDX - FIRST_WORD_IDX * 32;
+    constexpr uint64_t mask = BIT_WIDTH == 64 ? ~0L : (1UL << BIT_WIDTH) - 1;
+    const uint32_t* const in = reinterpret_cast<const uint32_t*>(in_buf);
+
+    // Avoid reading past the end of the buffer. We can safely read 64 bits if 
we know that
+    // this is a full batch read (so the input buffer is 32 * BIT_WIDTH long) 
and there is
+    // enough space in the buffer from the current reading point.
+    // We try to read 64 bits even when it is not necessary because the 
benchmarks show it
+    // is faster.
+    constexpr bool CAN_SAFELY_READ_64_BITS =
+            FULL_BATCH && FIRST_BIT_IDX - FIRST_BIT_OFFSET + 64 <= BIT_WIDTH * 
32;
+
+    // We do not try to read 64 bits when the bit width is a power of two 
(unless it is
+    // necessary) because performance benchmarks show that it is better this 
way. This seems
+    // to be due to compiler optimisation issues, so we can revisit it when we 
update the
+    // compiler version.
+    constexpr bool READ_32_BITS =
+            WORDS_TO_READ == 1 && (!CAN_SAFELY_READ_64_BITS || 
BitUtil::IsPowerOf2(BIT_WIDTH));
+
+    if (READ_32_BITS) {
+        uint32_t word = in[FIRST_WORD_IDX];
+        word >>= FIRST_BIT_OFFSET < 32 ? FIRST_BIT_OFFSET : 0;
+        return word & mask;
+    }
+
+    uint64_t word = *reinterpret_cast<const uint64_t*>(in + FIRST_WORD_IDX);
+    word >>= FIRST_BIT_OFFSET;
+
+    if (WORDS_TO_READ > 2) {
+        constexpr int USEFUL_BITS = FIRST_BIT_OFFSET == 0 ? 0 : 64 - 
FIRST_BIT_OFFSET;
+        uint64_t extra_word = in[FIRST_WORD_IDX + 2];
+        word |= extra_word << USEFUL_BITS;
+    }
+
+    return word & mask;
+}
+
+template <typename OutType>
+inline void ALWAYS_INLINE DecodeValue(OutType* __restrict__ dict, int64_t 
dict_len, uint32_t idx,
+                                      OutType* __restrict__ out_val,
+                                      bool* __restrict__ decode_error) {
+    if (UNLIKELY(idx >= dict_len)) {
+        *decode_error = true;
+    } else {
+        // Use memcpy() because we can't assume sufficient alignment in some 
cases (e.g.
+        // 16 byte decimals).
+        memcpy(out_val, &dict[idx], sizeof(OutType));
+    }
+}
+
+template <typename OutType, int BIT_WIDTH>
+const uint8_t* BitPacking::Unpack32Values(const uint8_t* __restrict__ in, 
int64_t in_bytes,
+                                          OutType* __restrict__ out) {
+    static_assert(BIT_WIDTH >= 0, "BIT_WIDTH too low");
+    static_assert(BIT_WIDTH <= MAX_BITWIDTH, "BIT_WIDTH too high");
+    DCHECK_LE(BIT_WIDTH, sizeof(OutType) * CHAR_BIT) << "BIT_WIDTH too high 
for output";
+    constexpr int BYTES_TO_READ = BitUtil::RoundUpNumBytes(32 * BIT_WIDTH);
+    DCHECK_GE(in_bytes, BYTES_TO_READ);
+
+    // Call UnpackValue for 0 <= i < 32.
+#pragma push_macro("UNPACK_VALUE_CALL")
+#define UNPACK_VALUE_CALL(ignore1, i, ignore2) \
+    out[i] = static_cast<OutType>(UnpackValue<BIT_WIDTH, i, true>(in));
+
+    BOOST_PP_REPEAT_FROM_TO(0, 32, UNPACK_VALUE_CALL, ignore);
+    return in + BYTES_TO_READ;
+#pragma pop_macro("UNPACK_VALUE_CALL")
+}
+
+template <typename OutType>
+const uint8_t* BitPacking::Unpack32Values(int bit_width, const uint8_t* 
__restrict__ in,
+                                          int64_t in_bytes, OutType* 
__restrict__ out) {
+#pragma push_macro("UNPACK_VALUES_CASE")
+#define UNPACK_VALUES_CASE(ignore1, i, ignore2) \
+    case i:                                     \
+        return Unpack32Values<OutType, i>(in, in_bytes, out);
+
+    switch (bit_width) {
+        // Expand cases from 0 to 64.
+        BOOST_PP_REPEAT_FROM_TO(0, 65, UNPACK_VALUES_CASE, ignore);
+    default:
+        DCHECK(false);
+        return in;
+    }
+#pragma pop_macro("UNPACK_VALUES_CASE")
+}
+
+template <typename OutType, int BIT_WIDTH>
+const uint8_t* BitPacking::UnpackAndDecode32Values(const uint8_t* __restrict__ 
in, int64_t in_bytes,
+                                                   OutType* __restrict__ dict, 
int64_t dict_len,
+                                                   OutType* __restrict__ out, 
int64_t stride,
+                                                   bool* __restrict__ 
decode_error) {
+    static_assert(BIT_WIDTH >= 0, "BIT_WIDTH too low");
+    static_assert(BIT_WIDTH <= MAX_BITWIDTH, "BIT_WIDTH too high");
+    constexpr int BYTES_TO_READ = BitUtil::RoundUpNumBytes(32 * BIT_WIDTH);
+    DCHECK_GE(in_bytes, BYTES_TO_READ);
+    // TODO: this could be optimised further by using SIMD instructions.
+    // 
https://lemire.me/blog/2016/08/25/faster-dictionary-decoding-with-simd-instructions/
+
+    static_assert(BIT_WIDTH <= MAX_DICT_BITWIDTH, "Too high bit width for 
dictionary index.");
+
+    // Call UnpackValue() and DecodeValue() for 0 <= i < 32.
+#pragma push_macro("DECODE_VALUE_CALL")
+#define DECODE_VALUE_CALL(ignore1, i, ignore2)                                 
              \
+    {                                                                          
              \
+        uint32_t idx = UnpackValue<BIT_WIDTH, i, true>(in);                    
              \
+        uint8_t* out_pos = reinterpret_cast<uint8_t*>(out) + i * stride;       
              \
+        DecodeValue(dict, dict_len, idx, reinterpret_cast<OutType*>(out_pos), 
decode_error); \
+    }
+
+    BOOST_PP_REPEAT_FROM_TO(0, 32, DECODE_VALUE_CALL, ignore);
+    return in + BYTES_TO_READ;
+#pragma pop_macro("DECODE_VALUE_CALL")
+}
+
+template <typename OutType, int BIT_WIDTH>
+const uint8_t* BitPacking::UnpackUpTo31Values(const uint8_t* __restrict__ in, 
int64_t in_bytes,
+                                              int num_values, OutType* 
__restrict__ out) {
+    static_assert(BIT_WIDTH >= 0, "BIT_WIDTH too low");
+    static_assert(BIT_WIDTH <= MAX_BITWIDTH, "BIT_WIDTH too high");
+    DCHECK_LE(BIT_WIDTH, sizeof(OutType) * CHAR_BIT) << "BIT_WIDTH too high 
for output";
+    constexpr int MAX_BATCH_SIZE = 31;
+    const int BYTES_TO_READ = BitUtil::RoundUpNumBytes(num_values * BIT_WIDTH);
+    DCHECK_GE(in_bytes, BYTES_TO_READ);
+    DCHECK_LE(num_values, MAX_BATCH_SIZE);
+
+    // Make sure the buffer is at least 1 byte.
+    constexpr int TMP_BUFFER_SIZE = BIT_WIDTH ? (BIT_WIDTH * (MAX_BATCH_SIZE + 
1)) / CHAR_BIT : 1;
+    uint8_t tmp_buffer[TMP_BUFFER_SIZE];
+
+    const uint8_t* in_buffer = in;
+    // Copy into padded temporary buffer to avoid reading past the end of 'in' 
if the
+    // last 32-bit load would go past the end of the buffer.
+    if (BitUtil::round_up(BYTES_TO_READ, sizeof(uint32_t)) > in_bytes) {
+        memcpy(tmp_buffer, in, BYTES_TO_READ);
+        in_buffer = tmp_buffer;
+    }
+
+#pragma push_macro("UNPACK_VALUES_CASE")
+#define UNPACK_VALUES_CASE(ignore1, i, ignore2) \
+    case 31 - i:                                \
+        out[30 - i] = static_cast<OutType>(UnpackValue<BIT_WIDTH, 30 - i, 
false>(in_buffer));
+
+    // Use switch with fall-through cases to minimise branching.
+    switch (num_values) {
+        // Expand cases from 31 down to 1.
+        BOOST_PP_REPEAT_FROM_TO(0, 31, UNPACK_VALUES_CASE, ignore);
+    case 0:
+        break;
+    default:
+        DCHECK(false);
+    }
+    return in + BYTES_TO_READ;
+#pragma pop_macro("UNPACK_VALUES_CASE")
+}
+
+template <typename OutType, int BIT_WIDTH>
+const uint8_t* BitPacking::UnpackAndDecodeUpTo31Values(const uint8_t* 
__restrict__ in,
+                                                       int64_t in_bytes, 
OutType* __restrict__ dict,
+                                                       int64_t dict_len, int 
num_values,
+                                                       OutType* __restrict__ 
out, int64_t stride,
+                                                       bool* __restrict__ 
decode_error) {
+    static_assert(BIT_WIDTH >= 0, "BIT_WIDTH too low");
+    static_assert(BIT_WIDTH <= MAX_BITWIDTH, "BIT_WIDTH too high");
+    constexpr int MAX_BATCH_SIZE = 31;
+    const int BYTES_TO_READ = BitUtil::RoundUpNumBytes(num_values * BIT_WIDTH);
+    DCHECK_GE(in_bytes, BYTES_TO_READ);
+    DCHECK_LE(num_values, MAX_BATCH_SIZE);
+
+    // Make sure the buffer is at least 1 byte.
+    constexpr int TMP_BUFFER_SIZE = BIT_WIDTH ? (BIT_WIDTH * (MAX_BATCH_SIZE + 
1)) / CHAR_BIT : 1;
+    uint8_t tmp_buffer[TMP_BUFFER_SIZE];
+
+    const uint8_t* in_buffer = in;
+    // Copy into padded temporary buffer to avoid reading past the end of 'in' 
if the
+    // last 32-bit load would go past the end of the buffer.
+    if (BitUtil::round_up(BYTES_TO_READ, sizeof(uint32_t)) > in_bytes) {
+        memcpy(tmp_buffer, in, BYTES_TO_READ);
+        in_buffer = tmp_buffer;
+    }
+
+#pragma push_macro("DECODE_VALUES_CASE")
+#define DECODE_VALUES_CASE(ignore1, i, ignore2)                                
              \
+    case 31 - i: {                                                             
              \
+        uint32_t idx = UnpackValue<BIT_WIDTH, 30 - i, false>(in_buffer);       
              \
+        uint8_t* out_pos = reinterpret_cast<uint8_t*>(out) + (30 - i) * 
stride;              \
+        DecodeValue(dict, dict_len, idx, reinterpret_cast<OutType*>(out_pos), 
decode_error); \
+    }
+
+    // Use switch with fall-through cases to minimise branching.
+    switch (num_values) {
+        // Expand cases from 31 down to 1.
+        BOOST_PP_REPEAT_FROM_TO(0, 31, DECODE_VALUES_CASE, ignore);
+    case 0:
+        break;
+    default:
+        DCHECK(false);
+    }
+    return in + BYTES_TO_READ;
+#pragma pop_macro("DECODE_VALUES_CASE")
+}
+
+} // namespace doris
diff --git a/be/src/util/bit_stream_utils.h b/be/src/util/bit_stream_utils.h
index 86922539b4..2316af997c 100644
--- a/be/src/util/bit_stream_utils.h
+++ b/be/src/util/bit_stream_utils.h
@@ -21,6 +21,7 @@
 #pragma once
 
 #include "gutil/port.h"
+#include "util/bit_packing.h"
 #include "util/bit_util.h"
 #include "util/faststring.h"
 
@@ -146,4 +147,102 @@ private:
     int bit_offset_;  // Offset in buffered_values_
 };
 
+/// Utility class to read bit/byte stream. This class can read bits or bytes 
that are
+/// either byte aligned or not. It also has utilities to read multiple bytes 
in one
+/// read (e.g. encoded int). Exposes a batch-oriented interface to allow 
efficient
+/// processing of multiple values at a time.
+class BatchedBitReader {
+public:
+    /// 'buffer' is the buffer to read from.  The buffer's length is 
'buffer_len'.
+    /// Does not take ownership of the buffer.
+    BatchedBitReader(const uint8_t* buffer, int64_t buffer_len) { 
Reset(buffer, buffer_len); }
+
+    BatchedBitReader() {}
+
+    // The implicit copy constructor is left defined. If a BatchedBitReader is 
copied, the
+    // two copies do not share any state. Invoking functions on either copy 
continues
+    // reading from the current read position without modifying the state of 
the other
+    // copy.
+
+    /// Resets the read to start reading from the start of 'buffer'. The 
buffer's
+    /// length is 'buffer_len'. Does not take ownership of the buffer.
+    void Reset(const uint8_t* buffer, int64_t buffer_len) {
+        DCHECK(buffer != nullptr);
+        DCHECK_GE(buffer_len, 0);
+        buffer_pos_ = buffer;
+        buffer_end_ = buffer + buffer_len;
+    }
+
+    /// Gets up to 'num_values' bit-packed values, starting from the current 
byte in the
+    /// buffer and advance the read position. 'bit_width' must be <= 64.
+    /// If 'bit_width' * 'num_values' is not a multiple of 8, the trailing 
bytes are
+    /// skipped and the next UnpackBatch() call will start reading from the 
next byte.
+    ///
+    /// If the caller does not want to drop trailing bits, 'num_values' must 
be exactly the
+    /// total number of values the caller wants to read from a run of 
bit-packed values, or
+    /// 'bit_width' * 'num_values' must be a multiple of 8. This condition is 
always
+    /// satisfied if 'num_values' is a multiple of 32.
+    ///
+    /// The output type 'T' must be an unsigned integer.
+    ///
+    /// Returns the number of values read.
+    template <typename T>
+    int UnpackBatch(int bit_width, int num_values, T* v);
+
+    /// Skip 'num_values_to_skip' bit-packed values.
+    /// 'num_values_to_skip * bit_width' is either divisible by 8, or
+    /// 'num_values_to_skip' equals to the count of the remaining bit-packed 
values.
+    bool SkipBatch(int bit_width, int num_values_to_skip);
+
+    /// Unpack bit-packed values in the same way as UnpackBatch() and decode 
them using the
+    /// dictionary 'dict' with 'dict_len' entries. Return -1 if a decoding 
error is
+    /// encountered, i.e. if the bit-packed values are not valid indices in 
'dict'.
+    /// Otherwise returns the number of values decoded. The values are written 
to 'v' with
+    /// a stride of 'stride' bytes.
+    template <typename T>
+    int UnpackAndDecodeBatch(int bit_width, T* dict, int64_t dict_len, int 
num_values, T* v,
+                             int64_t stride);
+
+    /// Reads an unpacked 'num_bytes'-sized value from the buffer and stores 
it in 'v'. T
+    /// needs to be a little-endian native type and big enough to store 
'num_bytes'.
+    /// Returns false if there are not enough bytes left.
+    template <typename T>
+    bool GetBytes(int num_bytes, T* v);
+
+    /// Read an unsigned ULEB-128 encoded int from the stream. The encoded int 
must start
+    /// at the beginning of a byte. Return false if there were not enough 
bytes in the
+    /// buffer or the int is invalid. For more details on ULEB-128:
+    /// https://en.wikipedia.org/wiki/LEB128
+    /// UINT_T must be an unsigned integer type.
+    template <typename UINT_T>
+    bool GetUleb128(UINT_T* v);
+
+    /// Read a ZigZag encoded int from the stream. The encoded int must start 
at the
+    /// beginning of a byte. Return false if there were not enough bytes in 
the buffer or
+    /// the int is invalid. For more details on ZigZag encoding:
+    /// 
https://developers.google.com/protocol-buffers/docs/encoding#signed-integers
+    /// INT_T must be a signed integer type.
+    template <typename INT_T>
+    bool GetZigZagInteger(INT_T* v);
+
+    /// Returns the number of bytes left in the stream.
+    int bytes_left() { return buffer_end_ - buffer_pos_; }
+
+    /// Maximum byte length of a vlq encoded integer of type T.
+    template <typename T>
+    static constexpr int max_vlq_byte_len() {
+        return BitUtil::Ceil(sizeof(T) * 8, 7);
+    }
+
+    /// Maximum supported bitwidth for reader.
+    static const int MAX_BITWIDTH = BitPacking::MAX_BITWIDTH;
+
+private:
+    /// Current read position in the buffer.
+    const uint8_t* buffer_pos_ = nullptr;
+
+    /// Pointer to the byte after the end of the buffer.
+    const uint8_t* buffer_end_ = nullptr;
+};
+
 } // namespace doris
diff --git a/be/src/util/bit_stream_utils.inline.h 
b/be/src/util/bit_stream_utils.inline.h
index 05a5b8e2f3..c9ba2428ae 100644
--- a/be/src/util/bit_stream_utils.inline.h
+++ b/be/src/util/bit_stream_utils.inline.h
@@ -24,6 +24,7 @@
 
 #include "glog/logging.h"
 #include "util/alignment.h"
+#include "util/bit_packing.inline.h"
 #include "util/bit_stream_utils.h"
 
 using doris::BitUtil;
@@ -208,4 +209,104 @@ inline bool BitReader::GetVlqInt(int32_t* v) {
     return true;
 }
 
+template <typename T>
+inline int BatchedBitReader::UnpackBatch(int bit_width, int num_values, T* v) {
+    DCHECK(buffer_pos_ != nullptr);
+    DCHECK_GE(bit_width, 0);
+    DCHECK_LE(bit_width, MAX_BITWIDTH);
+    DCHECK_LE(bit_width, sizeof(T) * 8);
+    DCHECK_GE(num_values, 0);
+
+    int64_t num_read;
+    std::tie(buffer_pos_, num_read) =
+            BitPacking::UnpackValues(bit_width, buffer_pos_, bytes_left(), 
num_values, v);
+    DCHECK_LE(buffer_pos_, buffer_end_);
+    DCHECK_LE(num_read, num_values);
+    return static_cast<int>(num_read);
+}
+
+inline bool BatchedBitReader::SkipBatch(int bit_width, int num_values_to_skip) 
{
+    DCHECK(buffer_pos_ != nullptr);
+    DCHECK_GT(bit_width, 0);
+    DCHECK_LE(bit_width, MAX_BITWIDTH);
+    DCHECK_GT(num_values_to_skip, 0);
+
+    int skip_bytes = BitUtil::RoundUpNumBytes(bit_width * num_values_to_skip);
+    if (skip_bytes > buffer_end_ - buffer_pos_) return false;
+    buffer_pos_ += skip_bytes;
+    return true;
+}
+
+template <typename T>
+inline int BatchedBitReader::UnpackAndDecodeBatch(int bit_width, T* dict, 
int64_t dict_len,
+                                                  int num_values, T* v, 
int64_t stride) {
+    DCHECK(buffer_pos_ != nullptr);
+    DCHECK_GE(bit_width, 0);
+    DCHECK_LE(bit_width, MAX_BITWIDTH);
+    DCHECK_GE(num_values, 0);
+
+    const uint8_t* new_buffer_pos;
+    int64_t num_read;
+    bool decode_error = false;
+    std::tie(new_buffer_pos, num_read) =
+            BitPacking::UnpackAndDecodeValues(bit_width, buffer_pos_, 
bytes_left(), dict, dict_len,
+                                              num_values, v, stride, 
&decode_error);
+    if (UNLIKELY(decode_error)) return -1;
+    buffer_pos_ = new_buffer_pos;
+    DCHECK_LE(buffer_pos_, buffer_end_);
+    DCHECK_LE(num_read, num_values);
+    return static_cast<int>(num_read);
+}
+
+template <typename T>
+inline bool BatchedBitReader::GetBytes(int num_bytes, T* v) {
+    DCHECK(buffer_pos_ != nullptr);
+    DCHECK_GE(num_bytes, 0);
+    DCHECK_LE(num_bytes, sizeof(T));
+    if (UNLIKELY(buffer_pos_ + num_bytes > buffer_end_)) return false;
+    *v = 0; // Ensure unset bytes are initialized to zero.
+    memcpy(v, buffer_pos_, num_bytes);
+    buffer_pos_ += num_bytes;
+    return true;
+}
+
+template <typename UINT_T>
+inline bool BatchedBitReader::GetUleb128(UINT_T* v) {
+    static_assert(std::is_integral<UINT_T>::value, "Integral type required.");
+    static_assert(std::is_unsigned<UINT_T>::value, "Unsigned type required.");
+    static_assert(!std::is_same<UINT_T, bool>::value, "Bools are not 
supported.");
+
+    *v = 0;
+    int shift = 0;
+    uint8_t byte = 0;
+    do {
+        if (UNLIKELY(shift >= max_vlq_byte_len<UINT_T>() * 7)) return false;
+        if (!GetBytes(1, &byte)) return false;
+
+        /// We need to convert 'byte' to UINT_T so that the result of the 
bitwise and
+        /// operation is at least as long an integer as '*v', otherwise the 
shift may be too
+        /// big and lead to undefined behaviour.
+        const UINT_T byte_as_UINT_T = byte;
+        *v |= (byte_as_UINT_T & 0x7Fu) << shift;
+        shift += 7;
+    } while ((byte & 0x80u) != 0);
+    return true;
+}
+
+template <typename INT_T>
+bool BatchedBitReader::GetZigZagInteger(INT_T* v) {
+    static_assert(std::is_integral<INT_T>::value, "Integral type required.");
+    static_assert(std::is_signed<INT_T>::value, "Signed type required.");
+
+    using UINT_T = std::make_unsigned_t<INT_T>;
+
+    UINT_T v_unsigned;
+    if (UNLIKELY(!GetUleb128<UINT_T>(&v_unsigned))) return false;
+
+    /// Here we rely on implementation defined behaviour in converting UINT_T 
to INT_T.
+    *v = (v_unsigned >> 1) ^ -(v_unsigned & 1u);
+
+    return true;
+}
+
 } // namespace doris
diff --git a/be/src/util/bit_util.h b/be/src/util/bit_util.h
index af50da4ef3..28534b139b 100644
--- a/be/src/util/bit_util.h
+++ b/be/src/util/bit_util.h
@@ -167,6 +167,9 @@ public:
     // Returns the rounded up to 64 multiple. Used for conversions of bits to 
i64.
     static inline uint32_t round_up_numi64(uint32_t bits) { return (bits + 63) 
>> 6; }
 
+    // Returns the rounded up to 32 multiple. Used for conversions of bits to 
i32.
+    constexpr static inline uint32_t round_up_numi32(uint32_t bits) { return 
(bits + 31) >> 5; }
+
 #if __BYTE_ORDER == __LITTLE_ENDIAN
     // Converts to big endian format (if not already in big endian).
     static inline int64_t big_endian(int64_t value) { return byte_swap(value); 
}
@@ -313,6 +316,11 @@ public:
         return (value + (factor - 1)) & ~(factor - 1);
     }
 
+    static inline int64_t RoundDownToPowerOf2(int64_t value, int64_t factor) {
+        DCHECK((factor > 0) && ((factor & (factor - 1)) == 0));
+        return value & ~(factor - 1);
+    }
+
     // Returns the ceil of value/divisor
     static inline int Ceil(int value, int divisor) {
         return value / divisor + (value % divisor != 0);
diff --git a/be/src/vec/exec/format/parquet/parquet_common.cpp 
b/be/src/vec/exec/format/parquet/parquet_common.cpp
index ec0c3ce411..1d80676b9b 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.cpp
+++ b/be/src/vec/exec/format/parquet/parquet_common.cpp
@@ -17,6 +17,8 @@
 
 #include "parquet_common.h"
 
+#include "util/coding.h"
+
 namespace doris::vectorized {
 
 Status Decoder::getDecoder(tparquet::Type::type type, tparquet::Encoding::type 
encoding,
@@ -24,6 +26,9 @@ Status Decoder::getDecoder(tparquet::Type::type type, 
tparquet::Encoding::type e
     switch (encoding) {
     case tparquet::Encoding::PLAIN:
         switch (type) {
+        case tparquet::Type::BOOLEAN:
+            decoder.reset(new BoolPlainDecoder());
+            break;
         case tparquet::Type::INT32:
             decoder.reset(new PlainDecoder<Int32>());
             break;
@@ -36,6 +41,12 @@ Status Decoder::getDecoder(tparquet::Type::type type, 
tparquet::Encoding::type e
         case tparquet::Type::DOUBLE:
             decoder.reset(new PlainDecoder<Float64>());
             break;
+        case tparquet::Type::BYTE_ARRAY:
+            decoder.reset(new BAPlainDecoder());
+            break;
+        case tparquet::Type::FIXED_LEN_BYTE_ARRAY:
+            decoder.reset(new FixedLengthBAPlainDecoder());
+            break;
         default:
             return Status::InternalError("Unsupported plain type {} in parquet 
decoder",
                                          tparquet::to_string(type));
@@ -49,12 +60,158 @@ Status Decoder::getDecoder(tparquet::Type::type type, 
tparquet::Encoding::type e
     return Status::OK();
 }
 
-MutableColumnPtr Decoder::getMutableColumnPtr(ColumnPtr& doris_column) {
-    // src column always be nullable for simple converting
+Status Decoder::decode_values(ColumnPtr& doris_column, size_t num_values) {
     CHECK(doris_column->is_nullable());
     auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
             (*std::move(doris_column)).mutate().get());
-    return nullable_column->get_nested_column_ptr();
+    MutableColumnPtr data_column = nullable_column->get_nested_column_ptr();
+    return _decode_values(data_column, num_values);
+}
+
+Status FixedLengthBAPlainDecoder::decode_values(Slice& slice, size_t 
num_values) {
+    size_t to_read_bytes = _type_length * num_values;
+    if (UNLIKELY(_offset + to_read_bytes > _data->size)) {
+        return Status::IOError("Out-of-bounds access in parquet data decoder");
+    }
+    // insert '\0' into the end of each binary
+    if (UNLIKELY(to_read_bytes + num_values > slice.size)) {
+        return Status::IOError("Slice does not have enough space to write out 
the decoding data");
+    }
+    uint32_t slice_offset = 0;
+    for (int i = 0; i < num_values; ++i) {
+        memcpy(slice.data + slice_offset, _data->data + _offset, _type_length);
+        slice_offset += _type_length + 1;
+        slice.data[slice_offset - 1] = '\0';
+        _offset += _type_length;
+    }
+    return Status::OK();
+}
+
+Status FixedLengthBAPlainDecoder::skip_values(size_t num_values) {
+    _offset += _type_length * num_values;
+    if (UNLIKELY(_offset > _data->size)) {
+        return Status::IOError("Out-of-bounds access in parquet data decoder");
+    }
+    return Status::OK();
 }
 
+Status FixedLengthBAPlainDecoder::_decode_values(MutableColumnPtr& 
doris_column,
+                                                 size_t num_values) {
+    if (UNLIKELY(_offset + _type_length * num_values > _data->size)) {
+        return Status::IOError("Out-of-bounds access in parquet data decoder");
+    }
+    auto& column_chars_t = 
assert_cast<ColumnString&>(*doris_column).get_chars();
+    auto& column_offsets = 
assert_cast<ColumnString&>(*doris_column).get_offsets();
+    for (int i = 0; i < num_values; ++i) {
+        column_chars_t.insert(_data->data + _offset, _data->data + _offset + 
_type_length);
+        column_chars_t.emplace_back('\0');
+        column_offsets.emplace_back(column_chars_t.size());
+        _offset += _type_length;
+    }
+    return Status::OK();
+}
+
+Status BAPlainDecoder::decode_values(Slice& slice, size_t num_values) {
+    uint32_t slice_offset = 0;
+    for (int i = 0; i < num_values; ++i) {
+        if (UNLIKELY(_offset + 4 > _data->size)) {
+            return Status::IOError("Can't read byte array length from plain 
decoder");
+        }
+        uint32_t length =
+                decode_fixed32_le(reinterpret_cast<const 
uint8_t*>(_data->data) + _offset);
+        _offset += 4;
+        if (UNLIKELY(_offset + length) > _data->size) {
+            return Status::IOError("Can't read enough bytes in plain decoder");
+        }
+        memcpy(slice.data + slice_offset, _data->data + _offset, length);
+        slice_offset += length + 1;
+        slice.data[slice_offset - 1] = '\0';
+        _offset += length;
+    }
+    return Status::OK();
+}
+
+Status BAPlainDecoder::skip_values(size_t num_values) {
+    for (int i = 0; i < num_values; ++i) {
+        if (UNLIKELY(_offset + 4 > _data->size)) {
+            return Status::IOError("Can't read byte array length from plain 
decoder");
+        }
+        uint32_t length =
+                decode_fixed32_le(reinterpret_cast<const 
uint8_t*>(_data->data) + _offset);
+        _offset += 4;
+        if (UNLIKELY(_offset + length) > _data->size) {
+            return Status::IOError("Can't skip enough bytes in plain decoder");
+        }
+        _offset += length;
+    }
+    return Status::OK();
+}
+
+Status BAPlainDecoder::_decode_values(MutableColumnPtr& doris_column, size_t 
num_values) {
+    auto& column_chars_t = 
assert_cast<ColumnString&>(*doris_column).get_chars();
+    auto& column_offsets = 
assert_cast<ColumnString&>(*doris_column).get_offsets();
+    for (int i = 0; i < num_values; ++i) {
+        if (UNLIKELY(_offset + 4 > _data->size)) {
+            return Status::IOError("Can't read byte array length from plain 
decoder");
+        }
+        uint32_t length =
+                decode_fixed32_le(reinterpret_cast<const 
uint8_t*>(_data->data) + _offset);
+        _offset += 4;
+        if (UNLIKELY(_offset + length) > _data->size) {
+            return Status::IOError("Can't read enough bytes in plain decoder");
+        }
+        column_chars_t.insert(_data->data + _offset, _data->data + _offset + 
length);
+        column_chars_t.emplace_back('\0');
+        column_offsets.emplace_back(column_chars_t.size());
+        _offset += length;
+    }
+    return Status::OK();
+}
+
+Status BoolPlainDecoder::decode_values(Slice& slice, size_t num_values) {
+    bool value;
+    for (int i = 0; i < num_values; ++i) {
+        if (UNLIKELY(!_decode_value(&value))) {
+            return Status::IOError("Can't read enough booleans in plain 
decoder");
+        }
+        slice.data[i] = value ? 1 : 0;
+    }
+    return Status::OK();
+}
+
+Status BoolPlainDecoder::skip_values(size_t num_values) {
+    int skip_cached = std::min(num_unpacked_values_ - unpacked_value_idx_, 
(int)num_values);
+    unpacked_value_idx_ += skip_cached;
+    if (skip_cached == num_values) {
+        return Status::OK();
+    }
+    int num_remaining = num_values - skip_cached;
+    int num_to_skip = BitUtil::RoundDownToPowerOf2(num_remaining, 32);
+    if (num_to_skip > 0) {
+        bool_values_.SkipBatch(1, num_to_skip);
+    }
+    num_remaining -= num_to_skip;
+    if (num_remaining > 0) {
+        DCHECK_LE(num_remaining, UNPACKED_BUFFER_LEN);
+        num_unpacked_values_ =
+                bool_values_.UnpackBatch(1, UNPACKED_BUFFER_LEN, 
&unpacked_values_[0]);
+        if (UNLIKELY(num_unpacked_values_ < num_remaining)) {
+            return Status::IOError("Can't skip enough booleans in plain 
decoder");
+        }
+        unpacked_value_idx_ = num_remaining;
+    }
+    return Status::OK();
+}
+
+Status BoolPlainDecoder::_decode_values(MutableColumnPtr& doris_column, size_t 
num_values) {
+    auto& column_data = 
static_cast<ColumnVector<UInt8>&>(*doris_column).get_data();
+    bool value;
+    for (int i = 0; i < num_values; ++i) {
+        if (UNLIKELY(!_decode_value(&value))) {
+            return Status::IOError("Can't read enough booleans in plain 
decoder");
+        }
+        column_data.emplace_back(value);
+    }
+    return Status::OK();
+}
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/parquet_common.h 
b/be/src/vec/exec/format/parquet/parquet_common.h
index 0f0e9f6e3d..44523ae22d 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.h
+++ b/be/src/vec/exec/format/parquet/parquet_common.h
@@ -21,8 +21,10 @@
 
 #include "common/status.h"
 #include "gen_cpp/parquet_types.h"
+#include "util/bit_stream_utils.inline.h"
 #include "vec/columns/column_array.h"
 #include "vec/columns/column_nullable.h"
+#include "vec/columns/column_string.h"
 
 namespace doris::vectorized {
 
@@ -36,25 +38,25 @@ public:
     static Status getDecoder(tparquet::Type::type type, 
tparquet::Encoding::type encoding,
                              std::unique_ptr<Decoder>& decoder);
 
-    static MutableColumnPtr getMutableColumnPtr(ColumnPtr& doris_column);
-
     // The type with fix length
     void set_type_length(int32_t type_length) { _type_length = type_length; }
 
     // Set the data to be decoded
-    void set_data(Slice* data) {
+    virtual void set_data(Slice* data) {
         _data = data;
         _offset = 0;
     }
 
     // Write the decoded values batch to doris's column
-    virtual Status decode_values(ColumnPtr& doris_column, size_t num_values) = 
0;
+    Status decode_values(ColumnPtr& doris_column, size_t num_values);
 
     virtual Status decode_values(Slice& slice, size_t num_values) = 0;
 
     virtual Status skip_values(size_t num_values) = 0;
 
 protected:
+    virtual Status _decode_values(MutableColumnPtr& doris_column, size_t 
num_values) = 0;
+
     int32_t _type_length;
     Slice* _data = nullptr;
     uint32_t _offset = 0;
@@ -66,19 +68,6 @@ public:
     PlainDecoder() = default;
     ~PlainDecoder() override = default;
 
-    Status decode_values(ColumnPtr& doris_column, size_t num_values) override {
-        size_t to_read_bytes = TYPE_LENGTH * num_values;
-        if (UNLIKELY(_offset + to_read_bytes > _data->size)) {
-            return Status::IOError("Out-of-bounds access in parquet data 
decoder");
-        }
-        auto data_column = getMutableColumnPtr(doris_column);
-        auto& column_data = 
static_cast<ColumnVector<T>&>(*data_column).get_data();
-        const auto* raw_data = reinterpret_cast<const T*>(_data->data + 
_offset);
-        column_data.insert(raw_data, raw_data + num_values);
-        _offset += to_read_bytes;
-        return Status::OK();
-    }
-
     Status decode_values(Slice& slice, size_t num_values) override {
         size_t to_read_bytes = TYPE_LENGTH * num_values;
         if (UNLIKELY(_offset + to_read_bytes > _data->size)) {
@@ -103,6 +92,98 @@ public:
 
 protected:
     enum { TYPE_LENGTH = sizeof(T) };
+
+    Status _decode_values(MutableColumnPtr& doris_column, size_t num_values) 
override {
+        size_t to_read_bytes = TYPE_LENGTH * num_values;
+        if (UNLIKELY(_offset + to_read_bytes > _data->size)) {
+            return Status::IOError("Out-of-bounds access in parquet data 
decoder");
+        }
+        auto& column_data = 
static_cast<ColumnVector<T>&>(*doris_column).get_data();
+        const auto* raw_data = reinterpret_cast<const T*>(_data->data + 
_offset);
+        column_data.insert(raw_data, raw_data + num_values);
+        _offset += to_read_bytes;
+        return Status::OK();
+    }
+};
+
+class FixedLengthBAPlainDecoder final : public Decoder {
+public:
+    FixedLengthBAPlainDecoder() = default;
+    ~FixedLengthBAPlainDecoder() override = default;
+
+    Status decode_values(Slice& slice, size_t num_values) override;
+
+    Status skip_values(size_t num_values) override;
+
+protected:
+    Status _decode_values(MutableColumnPtr& doris_column, size_t num_values) 
override;
+};
+
+class BAPlainDecoder final : public Decoder {
+public:
+    BAPlainDecoder() = default;
+    ~BAPlainDecoder() override = default;
+
+    Status decode_values(Slice& slice, size_t num_values) override;
+
+    Status skip_values(size_t num_values) override;
+
+protected:
+    Status _decode_values(MutableColumnPtr& doris_column, size_t num_values) 
override;
+};
+
+/// Decoder bit-packed boolean-encoded values.
+/// Implementation from 
https://github.com/apache/impala/blob/master/be/src/exec/parquet/parquet-bool-decoder.h
+class BoolPlainDecoder final : public Decoder {
+public:
+    BoolPlainDecoder() = default;
+    ~BoolPlainDecoder() override = default;
+
+    // Set the data to be decoded
+    void set_data(Slice* data) override {
+        bool_values_.Reset((const uint8_t*)data->data, data->size);
+        num_unpacked_values_ = 0;
+        unpacked_value_idx_ = 0;
+        _offset = 0;
+    }
+
+    Status decode_values(Slice& slice, size_t num_values) override;
+
+    Status skip_values(size_t num_values) override;
+
+protected:
+    inline bool _decode_value(bool* value) {
+        if (LIKELY(unpacked_value_idx_ < num_unpacked_values_)) {
+            *value = unpacked_values_[unpacked_value_idx_++];
+        } else {
+            num_unpacked_values_ =
+                    bool_values_.UnpackBatch(1, UNPACKED_BUFFER_LEN, 
&unpacked_values_[0]);
+            if (UNLIKELY(num_unpacked_values_ == 0)) {
+                return false;
+            }
+            *value = unpacked_values_[0];
+            unpacked_value_idx_ = 1;
+        }
+        return true;
+    }
+
+    Status _decode_values(MutableColumnPtr& doris_column, size_t num_values) 
override;
+
+    /// A buffer to store unpacked values. Must be a multiple of 32 size to 
use the
+    /// batch-oriented interface of BatchedBitReader. We use uint8_t instead 
of bool because
+    /// bit unpacking is only supported for unsigned integers. The values are 
converted to
+    /// bool when returned to the user.
+    static const int UNPACKED_BUFFER_LEN = 128;
+    uint8_t unpacked_values_[UNPACKED_BUFFER_LEN];
+
+    /// The number of valid values in 'unpacked_values_'.
+    int num_unpacked_values_ = 0;
+
+    /// The next value to return from 'unpacked_values_'.
+    int unpacked_value_idx_ = 0;
+
+    /// Bit packed decoder, used if 'encoding_' is PLAIN.
+    BatchedBitReader bool_values_;
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
index 11d6bfaf94..d4c7f534a3 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
@@ -86,7 +86,15 @@ Status ColumnChunkReader::load_page_data() {
     if (encoding == tparquet::Encoding::PLAIN_DICTIONARY) {
         encoding = tparquet::Encoding::RLE_DICTIONARY;
     }
-    Decoder::getDecoder(_metadata.type, encoding, _page_decoder);
+    // Reuse page decoder
+    if (_decoders.find(static_cast<int>(encoding)) != _decoders.end()) {
+        _page_decoder = _decoders[static_cast<int>(encoding)].get();
+    } else {
+        std::unique_ptr<Decoder> page_decoder;
+        Decoder::getDecoder(_metadata.type, encoding, page_decoder);
+        _decoders[static_cast<int>(encoding)] = std::move(page_decoder);
+        _page_decoder = _decoders[static_cast<int>(encoding)].get();
+    }
     _page_decoder->set_data(&_page_data);
     if (_type_length > 0) {
         _page_decoder->set_type_length(_type_length);
@@ -122,12 +130,12 @@ Status ColumnChunkReader::skip_values(size_t num_values) {
 
 size_t ColumnChunkReader::get_rep_levels(level_t* levels, size_t n) {
     DCHECK_GT(_max_rep_level, 0);
-    return _def_level_decoder.get_levels(levels, n);
+    return _rep_level_decoder.get_levels(levels, n);
 }
 
 size_t ColumnChunkReader::get_def_levels(level_t* levels, size_t n) {
     DCHECK_GT(_max_def_level, 0);
-    return _rep_level_decoder.get_levels(levels, n);
+    return _def_level_decoder.get_levels(levels, n);
 }
 
 Status ColumnChunkReader::decode_values(ColumnPtr& doris_column, size_t 
num_values) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
index c3b58be5c0..282612bd21 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
@@ -127,7 +127,10 @@ private:
     Slice _page_data;
     std::unique_ptr<uint8_t[]> _decompress_buf;
     size_t _decompress_buf_size = 0;
-    std::unique_ptr<Decoder> _page_decoder = nullptr;
+    Decoder* _page_decoder = nullptr;
+    // Map: encoding -> Decoder
+    // Plain or Dictionary encoding. If the dictionary grows too big, the 
encoding will fall back to the plain encoding
+    std::unordered_map<int, std::unique_ptr<Decoder>> _decoders;
     size_t _type_length = -1;
 };
 
diff --git a/be/test/exec/test_data/parquet_scanner/type-decoder.parquet 
b/be/test/exec/test_data/parquet_scanner/type-decoder.parquet
index e4b5820161..5f679c0005 100644
Binary files a/be/test/exec/test_data/parquet_scanner/type-decoder.parquet and 
b/be/test/exec/test_data/parquet_scanner/type-decoder.parquet differ
diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp 
b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
index 02bcef6261..db91103c88 100644
--- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
@@ -124,58 +124,147 @@ TEST_F(ParquetThriftReaderTest, complex_nested_file) {
     ASSERT_EQ(schemaDescriptor.get_column_index("mark"), 4);
 }
 
-TEST_F(ParquetThriftReaderTest, column_reader) {
-    // type-decoder.parquet is the part of following table:
-    // create table type-decoder (
-    //   int_col int)
-    // TODO(gaoxin): add more hive types
+static Status get_column_values(FileReader* file_reader, 
tparquet::ColumnChunk* column_chunk,
+                                FieldSchema* field_schema, Slice& slice) {
+    tparquet::ColumnMetaData chunk_meta = column_chunk->meta_data;
+    size_t start_offset = chunk_meta.__isset.dictionary_page_offset
+                                  ? chunk_meta.dictionary_page_offset
+                                  : chunk_meta.data_page_offset;
+    size_t chunk_size = chunk_meta.total_compressed_size;
+    BufferedFileStreamReader stream_reader(file_reader, start_offset, 
chunk_size);
+
+    ColumnChunkReader chunk_reader(&stream_reader, column_chunk, field_schema);
+    // initialize chunk reader
+    chunk_reader.init();
+    // seek to next page header
+    chunk_reader.next_page();
+    // load page data into underlying container
+    chunk_reader.load_page_data();
+    // decode page data
+    return chunk_reader.decode_values(slice, chunk_reader.num_values());
+}
+
+TEST_F(ParquetThriftReaderTest, type_decoder) {
+    /*
+     * type-decoder.parquet is the part of following table:
+     * create table `type_decoder`(
+     * `tinyint_col` tinyint, // 0
+     * `smallint_col` smallint, // 1
+     * `int_col` int, // 2
+     * `bigint_col` bigint, // 3
+     * `boolean_col` boolean, // 4
+     * `float_col` float, // 5
+     * `double_col` double, // 6
+     * `string_col` string, // 7
+     * `binary_col` binary, // 8
+     * `timestamp_col` timestamp, // 9
+     * `decimal_col` decimal(10,2), // 10
+     * `char_col` char(10), // 11
+     * `varchar_col` varchar(50), // 12
+     * `date_col` date, // 13
+     * `list_string` array<string>) // 14
+     */
     LocalFileReader 
reader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", 0);
+    /*
+     * Data in type-decoder.parquet:
+     * -1      -1      -1      -1      false   -1.14   -1.14   s-row0  b-row0  
2022-08-01 00:00:00     -1.14   c-row0          vc-row0 2022-08-01      
["as-0","as-1"]
+     * 2       2       2       2       true    2.14    2.14    NULL    b-row1  
2022-08-02 00:00:00     2.14    c-row1          vc-row1 2022-08-02      
[null,"as-3"]
+     * -3      -3      -3      -3      false   -3.14   -3.14   s-row2  b-row2  
2022-08-03 00:00:00     -3.14   c-row2          vc-row2 2022-08-03      []
+     * 4       4       4       4       true    4.14    4.14    NULL    b-row3  
2022-08-04 00:00:00     4.14    c-row3          vc-row3 2022-08-04      ["as-4"]
+     * -5      -5      -5      -5      false   -5.14   -5.14   s-row4  b-row4  
2022-08-05 00:00:00     -5.14   c-row4          vc-row4 2022-08-05      
["as-5",null]
+     * 6       6       6       6       false   6.14    6.14    s-row5  b-row5  
2022-08-06 00:00:00     6.14    c-row5          vc-row5 2022-08-06      
[null,null]
+     * -7      -7      -7      -7      true    -7.14   -7.14   s-row6  b-row6  
2022-08-07 00:00:00     -7.14   c-row6          vc-row6 2022-08-07      
["as-6","as-7"]
+     * 8       8       8       8       false   8.14    8.14    NULL    b-row7  
2022-08-08 00:00:00     8.14    c-row7          vc-row7 2022-08-08      
["as-0","as-8"]
+     * -9      -9      -9      -9      false   -9.14   -9.14   s-row8  b-row8  
2022-08-09 00:00:00     -9.14   c-row8          vc-row8 2022-08-09      
["as-9","as-10"]
+     * 10      10      10      10      false   10.14   10.14   s-row9  b-row9  
2022-08-10 00:00:00     10.14   c-row9          vc-row9 2022-08-10      
["as-11","as-12"]
+     */
     auto st = reader.open();
     EXPECT_TRUE(st.ok());
 
     std::shared_ptr<FileMetaData> metaData;
     parse_thrift_footer(&reader, metaData);
     tparquet::FileMetaData t_metadata = metaData->to_thrift_metadata();
-
-    // read the `int_col` column, it's the int-type column, and has ten values:
-    // -1, 2, -3, 4, -5, 6, -7, 8, -9, 10
-    tparquet::ColumnChunk column_chunk = t_metadata.row_groups[0].columns[0];
-    tparquet::ColumnMetaData chunk_meta = column_chunk.meta_data;
-    size_t start_offset = chunk_meta.__isset.dictionary_page_offset
-                                  ? chunk_meta.dictionary_page_offset
-                                  : chunk_meta.data_page_offset;
-    size_t chunk_size = chunk_meta.total_compressed_size;
-    BufferedFileStreamReader stream_reader(&reader, start_offset, chunk_size);
-
     FieldDescriptor schema_descriptor;
     schema_descriptor.parse_from_thrift(t_metadata.schema);
-    auto field_schema = 
const_cast<FieldSchema*>(schema_descriptor.get_column(0));
+    int rows = 10;
 
-    ColumnChunkReader chunk_reader(&stream_reader, &column_chunk, 
field_schema);
-    size_t batch_size = 10;
-    size_t int_length = 4;
-    char data[batch_size * int_length];
-    Slice slice(data, batch_size * int_length);
-    chunk_reader.init();
-    uint64_t int_sum = 0;
-    while (chunk_reader.has_next_page()) {
+    // the physical_type of tinyint_col, smallint_col and int_col are all INT32
+    // they are distinguished by converted_type(in 
FieldSchema.parquet_schema.converted_type)
+    for (int col_idx = 0; col_idx < 3; ++col_idx) {
+        char data[4 * rows];
+        Slice slice(data, 4 * rows);
+        get_column_values(&reader, &t_metadata.row_groups[0].columns[col_idx],
+                          
const_cast<FieldSchema*>(schema_descriptor.get_column(col_idx)), slice);
+        auto out_data = reinterpret_cast<int32_t*>(data);
+        int int_sum = 0;
+        for (int i = 0; i < rows; ++i) {
+            int_sum += out_data[i];
+        }
+        ASSERT_EQ(int_sum, 5);
+    }
+    // `bigint_col` bigint, // 3
+    {
+        char data[8 * rows];
+        Slice slice(data, 8 * rows);
+        get_column_values(&reader, &t_metadata.row_groups[0].columns[3],
+                          
const_cast<FieldSchema*>(schema_descriptor.get_column(3)), slice);
+        auto out_data = reinterpret_cast<int64_t*>(data);
+        int int_sum = 0;
+        for (int i = 0; i < rows; ++i) {
+            int_sum += out_data[i];
+        }
+        ASSERT_EQ(int_sum, 5);
+    }
+    // `boolean_col` boolean, // 4
+    {
+        char data[1 * rows];
+        Slice slice(data, 1 * rows);
+        get_column_values(&reader, &t_metadata.row_groups[0].columns[4],
+                          
const_cast<FieldSchema*>(schema_descriptor.get_column(4)), slice);
+        auto out_data = reinterpret_cast<bool*>(data);
+        ASSERT_FALSE(out_data[0]);
+        ASSERT_TRUE(out_data[1]);
+        ASSERT_FALSE(out_data[2]);
+        ASSERT_TRUE(out_data[3]);
+        ASSERT_FALSE(out_data[4]);
+        ASSERT_FALSE(out_data[5]);
+        ASSERT_TRUE(out_data[6]);
+        ASSERT_FALSE(out_data[7]);
+        ASSERT_FALSE(out_data[8]);
+        ASSERT_FALSE(out_data[9]);
+    }
+    // `string_col` string, // 7
+    {
+        tparquet::ColumnChunk column_chunk = 
t_metadata.row_groups[0].columns[7];
+        tparquet::ColumnMetaData chunk_meta = column_chunk.meta_data;
+        size_t start_offset = chunk_meta.__isset.dictionary_page_offset
+                                      ? chunk_meta.dictionary_page_offset
+                                      : chunk_meta.data_page_offset;
+        size_t chunk_size = chunk_meta.total_compressed_size;
+        BufferedFileStreamReader stream_reader(&reader, start_offset, 
chunk_size);
+
+        ColumnChunkReader chunk_reader(&stream_reader, &column_chunk,
+                                       
const_cast<FieldSchema*>(schema_descriptor.get_column(7)));
+        // initialize chunk reader
+        chunk_reader.init();
         // seek to next page header
         chunk_reader.next_page();
-        // load data to decoder
+        // load page data into underlying container
         chunk_reader.load_page_data();
-        while (chunk_reader.num_values() > 0) {
-            size_t num_values = chunk_reader.num_values() < batch_size
-                                        ? chunk_reader.num_values() < 
batch_size
-                                        : batch_size;
-            chunk_reader.decode_values(slice, num_values);
-            auto out_data = reinterpret_cast<Int32*>(slice.data);
-            for (int i = 0; i < num_values; i++) {
-                Int32 value = out_data[i];
-                int_sum += value;
-            }
-        }
+
+        char data[50 * rows];
+        Slice slice(data, 50 * rows);
+        level_t defs[rows];
+        // Analyze null string
+        chunk_reader.get_def_levels(defs, rows);
+        ASSERT_EQ(defs[1], 0);
+        ASSERT_EQ(defs[3], 0);
+        ASSERT_EQ(defs[7], 0);
+
+        chunk_reader.decode_values(slice, 7);
+        ASSERT_STREQ("s-row0", slice.data);
+        ASSERT_STREQ("s-row2", slice.data + 7);
     }
-    ASSERT_EQ(int_sum, 5);
 }
 
 } // namespace vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to