This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 8f5aed27ec [feature-wip](parquet-reader)read and decode parquet physical type (#11637) 8f5aed27ec is described below commit 8f5aed27ecdf01028393c0888acc8e1de3b1eeb1 Author: Ashin Gau <ashin...@users.noreply.github.com> AuthorDate: Thu Aug 11 10:17:32 2022 +0800 [feature-wip](parquet-reader)read and decode parquet physical type (#11637) # Proposed changes Read and decode parquet physical type. 1. The encoding type of boolean is bit-packing, this PR introduces the implementation of bit-packing from Impala 2. Create a parquet including all the primitive types supported by hive ## Remaining Problems 1. At present, only physical types are decoded, and there is no corresponding and conversion methods with doris logical. 2. No parsing and processing Decimal type / Timestamp / Date. 3. Int_8 / Int_16 is stored as Int_32. How to resolve these types. --- be/src/util/bit_packing.h | 140 ++++++++ be/src/util/bit_packing.inline.h | 380 +++++++++++++++++++++ be/src/util/bit_stream_utils.h | 99 ++++++ be/src/util/bit_stream_utils.inline.h | 101 ++++++ be/src/util/bit_util.h | 8 + be/src/vec/exec/format/parquet/parquet_common.cpp | 163 ++++++++- be/src/vec/exec/format/parquet/parquet_common.h | 115 ++++++- .../parquet/vparquet_column_chunk_reader.cpp | 14 +- .../format/parquet/vparquet_column_chunk_reader.h | 5 +- .../test_data/parquet_scanner/type-decoder.parquet | Bin 338 -> 3405 bytes be/test/vec/exec/parquet/parquet_thrift_test.cpp | 165 ++++++--- 11 files changed, 1128 insertions(+), 62 deletions(-) diff --git a/be/src/util/bit_packing.h b/be/src/util/bit_packing.h new file mode 100644 index 0000000000..61a389512a --- /dev/null +++ b/be/src/util/bit_packing.h @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// the implement of BitPacking is from impala + +#pragma once + +#include "util/bit_util.h" + +namespace doris { + +/// Utilities for manipulating bit-packed values. Bit-packing is a technique for +/// compressing integer values that do not use the full range of the integer type. +/// E.g. an array of uint32_t values with range [0, 31] only uses the lower 5 bits +/// of every uint32_t value, or an array of 0/1 booleans only uses the lowest bit +/// of each integer. +/// +/// Bit-packing always has a "bit width" parameter that determines the range of +/// representable unsigned values: [0, 2^bit_width - 1]. The packed representation +/// is logically the concatenatation of the lower bits of the input values (in +/// little-endian order). E.g. the values 1, 2, 3, 4 packed with bit width 4 results +/// in the two output bytes: [ 0 0 1 0 | 0 0 0 1 ] [ 0 1 0 0 | 0 0 1 1 ] +/// 2 1 4 3 +/// +/// Packed values can be split across words, e.g. packing 1, 17 with bit_width 5 results +/// in the two output bytes: [ 0 0 1 | 0 0 0 0 1 ] [ x x x x x x | 1 0 ] +/// lower bits of 17--^ 1 next value ^--upper bits of 17 +/// +/// Bit widths from 0 to 64 are supported (0 bit width means that every value is 0). +/// The batched unpacking functions operate on batches of 32 values. This batch size +/// is convenient because for every supported bit width, the end of a 32 value batch +/// falls on a byte boundary. It is also large enough to amortise loop overheads. +class BitPacking { +public: + static constexpr int MAX_BITWIDTH = sizeof(uint64_t) * 8; + static constexpr int MAX_DICT_BITWIDTH = sizeof(uint32_t) * 8; + + /// Unpack bit-packed values with 'bit_width' from 'in' to 'out'. Keeps unpacking until + /// either all 'in_bytes' are read or 'num_values' values are unpacked. 'out' must have + /// enough space for 'num_values'. 0 <= 'bit_width' <= 64 and 'bit_width' <= # of bits + /// in OutType. 'in' must point to 'in_bytes' of addressable memory. + /// + /// Returns a pointer to the byte after the last byte of 'in' that was read and also the + /// number of values that were read. If the caller wants to continue reading packed + /// values after the last one returned, it must ensure that the next value to unpack + /// starts at a byte boundary. This is true if 'num_values' is a multiple of 32, or + /// more generally if (bit_width * num_values) % 8 == 0. + template <typename OutType> + static std::pair<const uint8_t*, int64_t> UnpackValues(int bit_width, + const uint8_t* __restrict__ in, + int64_t in_bytes, int64_t num_values, + OutType* __restrict__ out); + + /// Same as above, templated by BIT_WIDTH. + template <typename OutType, int BIT_WIDTH> + static std::pair<const uint8_t*, int64_t> UnpackValues(const uint8_t* __restrict__ in, + int64_t in_bytes, int64_t num_values, + OutType* __restrict__ out); + + /// Unpack values as above, treating them as unsigned integers, and decode them + /// using the provided dict. Writes them to 'out' with a stride of 'stride' bytes. + /// Sets 'decode_error' to true if one of the packed values was greater than 'dict_len'. + /// Does not modify 'decode_error' on success. + template <typename OutType> + static std::pair<const uint8_t*, int64_t> UnpackAndDecodeValues( + int bit_width, const uint8_t* __restrict__ in, int64_t in_bytes, + OutType* __restrict__ dict, int64_t dict_len, int64_t num_values, + OutType* __restrict__ out, int64_t stride, bool* __restrict__ decode_error); + + /// Same as above, templated by BIT_WIDTH. + template <typename OutType, int BIT_WIDTH> + static std::pair<const uint8_t*, int64_t> UnpackAndDecodeValues( + const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ dict, + int64_t dict_len, int64_t num_values, OutType* __restrict__ out, int64_t stride, + bool* __restrict__ decode_error); + + /// Unpack exactly 32 values of 'bit_width' from 'in' to 'out'. 'in' must point to + /// 'in_bytes' of addressable memory, and 'in_bytes' must be at least + /// (32 * bit_width / 8). 'out' must have space for 32 OutType values. + /// 0 <= 'bit_width' <= 64 and 'bit_width' <= # of bits in OutType. + template <typename OutType> + static const uint8_t* Unpack32Values(int bit_width, const uint8_t* __restrict__ in, + int64_t in_bytes, OutType* __restrict__ out); + + /// Same as Unpack32Values() but templated by BIT_WIDTH. + template <typename OutType, int BIT_WIDTH> + static const uint8_t* Unpack32Values(const uint8_t* __restrict__ in, int64_t in_bytes, + OutType* __restrict__ out); + + /// Same as Unpack32Values() with dictionary decoding. + template <typename OutType> + static const uint8_t* UnpackAndDecode32Values(int bit_width, const uint8_t* __restrict__ in, + int64_t in_bytes, OutType* __restrict__ dict, + int64_t dict_len, OutType* __restrict__ out, + int64_t stride, bool* __restrict__ decode_error); + + /// Same as UnpackAndDecode32Values() but templated by BIT_WIDTH. + template <typename OutType, int BIT_WIDTH> + static const uint8_t* UnpackAndDecode32Values(const uint8_t* __restrict__ in, int64_t in_bytes, + OutType* __restrict__ dict, int64_t dict_len, + OutType* __restrict__ out, int64_t stride, + bool* __restrict__ decode_error); + + /// Unpacks 'num_values' values with the given BIT_WIDTH from 'in' to 'out'. + /// 'num_values' must be at most 31. 'in' must point to 'in_bytes' of addressable + /// memory, and 'in_bytes' must be at least ceil(num_values * bit_width / 8). + /// 'out' must have space for 'num_values' OutType values. + /// 0 <= 'bit_width' <= 64 and 'bit_width' <= # of bits in OutType. + template <typename OutType, int BIT_WIDTH> + static const uint8_t* UnpackUpTo31Values(const uint8_t* __restrict__ in, int64_t in_bytes, + int num_values, OutType* __restrict__ out); + + /// Same as UnpackUpTo31Values() with dictionary decoding. + template <typename OutType, int BIT_WIDTH> + static const uint8_t* UnpackAndDecodeUpTo31Values(const uint8_t* __restrict__ in, + int64_t in_bytes, OutType* __restrict__ dict, + int64_t dict_len, int num_values, + OutType* __restrict__ out, int64_t stride, + bool* __restrict__ decode_error); + +private: + /// Compute the number of values with the given bit width that can be unpacked from + /// an input buffer of 'in_bytes' into an output buffer with space for 'num_values'. + static int64_t NumValuesToUnpack(int bit_width, int64_t in_bytes, int64_t num_values); +}; +} // namespace doris diff --git a/be/src/util/bit_packing.inline.h b/be/src/util/bit_packing.inline.h new file mode 100644 index 0000000000..51efbb125d --- /dev/null +++ b/be/src/util/bit_packing.inline.h @@ -0,0 +1,380 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// the implement of BitPacking is from impala + +#include <boost/preprocessor/repetition/repeat_from_to.hpp> + +#include "util/bit_packing.h" + +namespace doris { + +inline int64_t BitPacking::NumValuesToUnpack(int bit_width, int64_t in_bytes, int64_t num_values) { + // Check if we have enough input bytes to decode 'num_values'. + if (bit_width == 0 || BitUtil::RoundUpNumBytes(num_values * bit_width) <= in_bytes) { + // Limited by output space. + return num_values; + } else { + // Limited by the number of input bytes. Compute the number of values that can be + // unpacked from the input. + return (in_bytes * CHAR_BIT) / bit_width; + } +} + +template <typename T> +constexpr bool IsSupportedUnpackingType() { + return std::is_same<T, uint8_t>::value || std::is_same<T, uint16_t>::value || + std::is_same<T, uint32_t>::value || std::is_same<T, uint64_t>::value; +} + +template <typename OutType> +std::pair<const uint8_t*, int64_t> BitPacking::UnpackValues(int bit_width, + const uint8_t* __restrict__ in, + int64_t in_bytes, int64_t num_values, + OutType* __restrict__ out) { + static_assert(IsSupportedUnpackingType<OutType>(), "Only unsigned integers are supported."); + +#pragma push_macro("UNPACK_VALUES_CASE") +#define UNPACK_VALUES_CASE(ignore1, i, ignore2) \ + case i: \ + return UnpackValues<OutType, i>(in, in_bytes, num_values, out); + + switch (bit_width) { + // Expand cases from 0 to 64. + BOOST_PP_REPEAT_FROM_TO(0, 65, UNPACK_VALUES_CASE, ignore); + default: + DCHECK(false); + return std::make_pair(nullptr, -1); + } +#pragma pop_macro("UNPACK_VALUES_CASE") +} + +template <typename OutType, int BIT_WIDTH> +std::pair<const uint8_t*, int64_t> BitPacking::UnpackValues(const uint8_t* __restrict__ in, + int64_t in_bytes, int64_t num_values, + OutType* __restrict__ out) { + static_assert(IsSupportedUnpackingType<OutType>(), "Only unsigned integers are supported."); + + constexpr int BATCH_SIZE = 32; + const int64_t values_to_read = NumValuesToUnpack(BIT_WIDTH, in_bytes, num_values); + const int64_t batches_to_read = values_to_read / BATCH_SIZE; + const int64_t remainder_values = values_to_read % BATCH_SIZE; + const uint8_t* in_pos = in; + OutType* out_pos = out; + + // First unpack as many full batches as possible. + for (int64_t i = 0; i < batches_to_read; ++i) { + in_pos = Unpack32Values<OutType, BIT_WIDTH>(in_pos, in_bytes, out_pos); + out_pos += BATCH_SIZE; + in_bytes -= (BATCH_SIZE * BIT_WIDTH) / CHAR_BIT; + } + + // Then unpack the final partial batch. + if (remainder_values > 0) { + in_pos = + UnpackUpTo31Values<OutType, BIT_WIDTH>(in_pos, in_bytes, remainder_values, out_pos); + } + return std::make_pair(in_pos, values_to_read); +} + +template <typename OutType> +std::pair<const uint8_t*, int64_t> BitPacking::UnpackAndDecodeValues( + int bit_width, const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ dict, + int64_t dict_len, int64_t num_values, OutType* __restrict__ out, int64_t stride, + bool* __restrict__ decode_error) { +#pragma push_macro("UNPACK_VALUES_CASE") +#define UNPACK_VALUES_CASE(ignore1, i, ignore2) \ + case i: \ + return UnpackAndDecodeValues<OutType, i>(in, in_bytes, dict, dict_len, num_values, out, \ + stride, decode_error); + + switch (bit_width) { + // Expand cases from 0 to MAX_DICT_BITWIDTH. + BOOST_PP_REPEAT_FROM_TO(0, 33, UNPACK_VALUES_CASE, ignore); + default: + DCHECK(false); + return std::make_pair(nullptr, -1); + } +#pragma pop_macro("UNPACK_VALUES_CASE") +} +template <typename OutType, int BIT_WIDTH> +std::pair<const uint8_t*, int64_t> BitPacking::UnpackAndDecodeValues( + const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ dict, + int64_t dict_len, int64_t num_values, OutType* __restrict__ out, int64_t stride, + bool* __restrict__ decode_error) { + constexpr int BATCH_SIZE = 32; + const int64_t values_to_read = NumValuesToUnpack(BIT_WIDTH, in_bytes, num_values); + const int64_t batches_to_read = values_to_read / BATCH_SIZE; + const int64_t remainder_values = values_to_read % BATCH_SIZE; + const uint8_t* in_pos = in; + uint8_t* out_pos = reinterpret_cast<uint8_t*>(out); + // First unpack as many full batches as possible. + for (int64_t i = 0; i < batches_to_read; ++i) { + in_pos = UnpackAndDecode32Values<OutType, BIT_WIDTH>(in_pos, in_bytes, dict, dict_len, + reinterpret_cast<OutType*>(out_pos), + stride, decode_error); + out_pos += stride * BATCH_SIZE; + in_bytes -= (BATCH_SIZE * BIT_WIDTH) / CHAR_BIT; + } + // Then unpack the final partial batch. + if (remainder_values > 0) { + in_pos = UnpackAndDecodeUpTo31Values<OutType, BIT_WIDTH>( + in_pos, in_bytes, dict, dict_len, remainder_values, + reinterpret_cast<OutType*>(out_pos), stride, decode_error); + } + return std::make_pair(in_pos, values_to_read); +} + +// Loop body of unrolled loop that unpacks the value. BIT_WIDTH is the bit width of +// the packed values. 'in_buf' is the start of the input buffer and 'out_vals' is the +// start of the output values array. This function unpacks the VALUE_IDX'th packed value +// from 'in_buf'. +// +// This implements essentially the same algorithm as the (Apache-licensed) code in +// bpacking.c at https://github.com/lemire/FrameOfReference/, but is much more compact +// because it uses templates rather than source-level unrolling of all combinations. +// +// After the template parameters are expanded and constants are propagated, all branches +// and offset/shift calculations should be optimized out, leaving only shifts by constants +// and bitmasks by constants. Calls to this must be stamped out manually or with +// BOOST_PP_REPEAT_FROM_TO: experimentation revealed that the GCC 4.9.2 optimiser was +// not able to fully propagate constants and remove branches when this was called from +// inside a for loop with constant bounds with VALUE_IDX changed to a function argument. +// +// We compute how many 32 bit words we have to read, which is either 1, 2 or 3. If it is +// at least 2, the first two 32 bit words are read as one 64 bit word. Even if only one +// word needs to be read, we try to read 64 bits if it does not lead to buffer overflow +// because benchmarks show that it has a positive effect on performance. +// +// If 'FULL_BATCH' is true, this function call is part of unpacking 32 values, otherwise +// up to 31 values. This is needed to optimise the length of the reads (32 or 64 bits) and +// avoid buffer overflow (if we are unpacking 32 values, we can safely assume an input +// buffer of length 32 * BIT_WIDTH). +template <int BIT_WIDTH, int VALUE_IDX, bool FULL_BATCH> +inline uint64_t ALWAYS_INLINE UnpackValue(const uint8_t* __restrict__ in_buf) { + if (BIT_WIDTH == 0) return 0; + + constexpr int FIRST_BIT_IDX = VALUE_IDX * BIT_WIDTH; + constexpr int FIRST_WORD_IDX = FIRST_BIT_IDX / 32; + constexpr int LAST_BIT_IDX = FIRST_BIT_IDX + BIT_WIDTH; + constexpr int LAST_WORD_IDX = BitUtil::round_up_numi32(LAST_BIT_IDX); + constexpr int WORDS_TO_READ = LAST_WORD_IDX - FIRST_WORD_IDX; + static_assert(WORDS_TO_READ <= 3, "At most three 32-bit words need to be loaded."); + + constexpr int FIRST_BIT_OFFSET = FIRST_BIT_IDX - FIRST_WORD_IDX * 32; + constexpr uint64_t mask = BIT_WIDTH == 64 ? ~0L : (1UL << BIT_WIDTH) - 1; + const uint32_t* const in = reinterpret_cast<const uint32_t*>(in_buf); + + // Avoid reading past the end of the buffer. We can safely read 64 bits if we know that + // this is a full batch read (so the input buffer is 32 * BIT_WIDTH long) and there is + // enough space in the buffer from the current reading point. + // We try to read 64 bits even when it is not necessary because the benchmarks show it + // is faster. + constexpr bool CAN_SAFELY_READ_64_BITS = + FULL_BATCH && FIRST_BIT_IDX - FIRST_BIT_OFFSET + 64 <= BIT_WIDTH * 32; + + // We do not try to read 64 bits when the bit width is a power of two (unless it is + // necessary) because performance benchmarks show that it is better this way. This seems + // to be due to compiler optimisation issues, so we can revisit it when we update the + // compiler version. + constexpr bool READ_32_BITS = + WORDS_TO_READ == 1 && (!CAN_SAFELY_READ_64_BITS || BitUtil::IsPowerOf2(BIT_WIDTH)); + + if (READ_32_BITS) { + uint32_t word = in[FIRST_WORD_IDX]; + word >>= FIRST_BIT_OFFSET < 32 ? FIRST_BIT_OFFSET : 0; + return word & mask; + } + + uint64_t word = *reinterpret_cast<const uint64_t*>(in + FIRST_WORD_IDX); + word >>= FIRST_BIT_OFFSET; + + if (WORDS_TO_READ > 2) { + constexpr int USEFUL_BITS = FIRST_BIT_OFFSET == 0 ? 0 : 64 - FIRST_BIT_OFFSET; + uint64_t extra_word = in[FIRST_WORD_IDX + 2]; + word |= extra_word << USEFUL_BITS; + } + + return word & mask; +} + +template <typename OutType> +inline void ALWAYS_INLINE DecodeValue(OutType* __restrict__ dict, int64_t dict_len, uint32_t idx, + OutType* __restrict__ out_val, + bool* __restrict__ decode_error) { + if (UNLIKELY(idx >= dict_len)) { + *decode_error = true; + } else { + // Use memcpy() because we can't assume sufficient alignment in some cases (e.g. + // 16 byte decimals). + memcpy(out_val, &dict[idx], sizeof(OutType)); + } +} + +template <typename OutType, int BIT_WIDTH> +const uint8_t* BitPacking::Unpack32Values(const uint8_t* __restrict__ in, int64_t in_bytes, + OutType* __restrict__ out) { + static_assert(BIT_WIDTH >= 0, "BIT_WIDTH too low"); + static_assert(BIT_WIDTH <= MAX_BITWIDTH, "BIT_WIDTH too high"); + DCHECK_LE(BIT_WIDTH, sizeof(OutType) * CHAR_BIT) << "BIT_WIDTH too high for output"; + constexpr int BYTES_TO_READ = BitUtil::RoundUpNumBytes(32 * BIT_WIDTH); + DCHECK_GE(in_bytes, BYTES_TO_READ); + + // Call UnpackValue for 0 <= i < 32. +#pragma push_macro("UNPACK_VALUE_CALL") +#define UNPACK_VALUE_CALL(ignore1, i, ignore2) \ + out[i] = static_cast<OutType>(UnpackValue<BIT_WIDTH, i, true>(in)); + + BOOST_PP_REPEAT_FROM_TO(0, 32, UNPACK_VALUE_CALL, ignore); + return in + BYTES_TO_READ; +#pragma pop_macro("UNPACK_VALUE_CALL") +} + +template <typename OutType> +const uint8_t* BitPacking::Unpack32Values(int bit_width, const uint8_t* __restrict__ in, + int64_t in_bytes, OutType* __restrict__ out) { +#pragma push_macro("UNPACK_VALUES_CASE") +#define UNPACK_VALUES_CASE(ignore1, i, ignore2) \ + case i: \ + return Unpack32Values<OutType, i>(in, in_bytes, out); + + switch (bit_width) { + // Expand cases from 0 to 64. + BOOST_PP_REPEAT_FROM_TO(0, 65, UNPACK_VALUES_CASE, ignore); + default: + DCHECK(false); + return in; + } +#pragma pop_macro("UNPACK_VALUES_CASE") +} + +template <typename OutType, int BIT_WIDTH> +const uint8_t* BitPacking::UnpackAndDecode32Values(const uint8_t* __restrict__ in, int64_t in_bytes, + OutType* __restrict__ dict, int64_t dict_len, + OutType* __restrict__ out, int64_t stride, + bool* __restrict__ decode_error) { + static_assert(BIT_WIDTH >= 0, "BIT_WIDTH too low"); + static_assert(BIT_WIDTH <= MAX_BITWIDTH, "BIT_WIDTH too high"); + constexpr int BYTES_TO_READ = BitUtil::RoundUpNumBytes(32 * BIT_WIDTH); + DCHECK_GE(in_bytes, BYTES_TO_READ); + // TODO: this could be optimised further by using SIMD instructions. + // https://lemire.me/blog/2016/08/25/faster-dictionary-decoding-with-simd-instructions/ + + static_assert(BIT_WIDTH <= MAX_DICT_BITWIDTH, "Too high bit width for dictionary index."); + + // Call UnpackValue() and DecodeValue() for 0 <= i < 32. +#pragma push_macro("DECODE_VALUE_CALL") +#define DECODE_VALUE_CALL(ignore1, i, ignore2) \ + { \ + uint32_t idx = UnpackValue<BIT_WIDTH, i, true>(in); \ + uint8_t* out_pos = reinterpret_cast<uint8_t*>(out) + i * stride; \ + DecodeValue(dict, dict_len, idx, reinterpret_cast<OutType*>(out_pos), decode_error); \ + } + + BOOST_PP_REPEAT_FROM_TO(0, 32, DECODE_VALUE_CALL, ignore); + return in + BYTES_TO_READ; +#pragma pop_macro("DECODE_VALUE_CALL") +} + +template <typename OutType, int BIT_WIDTH> +const uint8_t* BitPacking::UnpackUpTo31Values(const uint8_t* __restrict__ in, int64_t in_bytes, + int num_values, OutType* __restrict__ out) { + static_assert(BIT_WIDTH >= 0, "BIT_WIDTH too low"); + static_assert(BIT_WIDTH <= MAX_BITWIDTH, "BIT_WIDTH too high"); + DCHECK_LE(BIT_WIDTH, sizeof(OutType) * CHAR_BIT) << "BIT_WIDTH too high for output"; + constexpr int MAX_BATCH_SIZE = 31; + const int BYTES_TO_READ = BitUtil::RoundUpNumBytes(num_values * BIT_WIDTH); + DCHECK_GE(in_bytes, BYTES_TO_READ); + DCHECK_LE(num_values, MAX_BATCH_SIZE); + + // Make sure the buffer is at least 1 byte. + constexpr int TMP_BUFFER_SIZE = BIT_WIDTH ? (BIT_WIDTH * (MAX_BATCH_SIZE + 1)) / CHAR_BIT : 1; + uint8_t tmp_buffer[TMP_BUFFER_SIZE]; + + const uint8_t* in_buffer = in; + // Copy into padded temporary buffer to avoid reading past the end of 'in' if the + // last 32-bit load would go past the end of the buffer. + if (BitUtil::round_up(BYTES_TO_READ, sizeof(uint32_t)) > in_bytes) { + memcpy(tmp_buffer, in, BYTES_TO_READ); + in_buffer = tmp_buffer; + } + +#pragma push_macro("UNPACK_VALUES_CASE") +#define UNPACK_VALUES_CASE(ignore1, i, ignore2) \ + case 31 - i: \ + out[30 - i] = static_cast<OutType>(UnpackValue<BIT_WIDTH, 30 - i, false>(in_buffer)); + + // Use switch with fall-through cases to minimise branching. + switch (num_values) { + // Expand cases from 31 down to 1. + BOOST_PP_REPEAT_FROM_TO(0, 31, UNPACK_VALUES_CASE, ignore); + case 0: + break; + default: + DCHECK(false); + } + return in + BYTES_TO_READ; +#pragma pop_macro("UNPACK_VALUES_CASE") +} + +template <typename OutType, int BIT_WIDTH> +const uint8_t* BitPacking::UnpackAndDecodeUpTo31Values(const uint8_t* __restrict__ in, + int64_t in_bytes, OutType* __restrict__ dict, + int64_t dict_len, int num_values, + OutType* __restrict__ out, int64_t stride, + bool* __restrict__ decode_error) { + static_assert(BIT_WIDTH >= 0, "BIT_WIDTH too low"); + static_assert(BIT_WIDTH <= MAX_BITWIDTH, "BIT_WIDTH too high"); + constexpr int MAX_BATCH_SIZE = 31; + const int BYTES_TO_READ = BitUtil::RoundUpNumBytes(num_values * BIT_WIDTH); + DCHECK_GE(in_bytes, BYTES_TO_READ); + DCHECK_LE(num_values, MAX_BATCH_SIZE); + + // Make sure the buffer is at least 1 byte. + constexpr int TMP_BUFFER_SIZE = BIT_WIDTH ? (BIT_WIDTH * (MAX_BATCH_SIZE + 1)) / CHAR_BIT : 1; + uint8_t tmp_buffer[TMP_BUFFER_SIZE]; + + const uint8_t* in_buffer = in; + // Copy into padded temporary buffer to avoid reading past the end of 'in' if the + // last 32-bit load would go past the end of the buffer. + if (BitUtil::round_up(BYTES_TO_READ, sizeof(uint32_t)) > in_bytes) { + memcpy(tmp_buffer, in, BYTES_TO_READ); + in_buffer = tmp_buffer; + } + +#pragma push_macro("DECODE_VALUES_CASE") +#define DECODE_VALUES_CASE(ignore1, i, ignore2) \ + case 31 - i: { \ + uint32_t idx = UnpackValue<BIT_WIDTH, 30 - i, false>(in_buffer); \ + uint8_t* out_pos = reinterpret_cast<uint8_t*>(out) + (30 - i) * stride; \ + DecodeValue(dict, dict_len, idx, reinterpret_cast<OutType*>(out_pos), decode_error); \ + } + + // Use switch with fall-through cases to minimise branching. + switch (num_values) { + // Expand cases from 31 down to 1. + BOOST_PP_REPEAT_FROM_TO(0, 31, DECODE_VALUES_CASE, ignore); + case 0: + break; + default: + DCHECK(false); + } + return in + BYTES_TO_READ; +#pragma pop_macro("DECODE_VALUES_CASE") +} + +} // namespace doris diff --git a/be/src/util/bit_stream_utils.h b/be/src/util/bit_stream_utils.h index 86922539b4..2316af997c 100644 --- a/be/src/util/bit_stream_utils.h +++ b/be/src/util/bit_stream_utils.h @@ -21,6 +21,7 @@ #pragma once #include "gutil/port.h" +#include "util/bit_packing.h" #include "util/bit_util.h" #include "util/faststring.h" @@ -146,4 +147,102 @@ private: int bit_offset_; // Offset in buffered_values_ }; +/// Utility class to read bit/byte stream. This class can read bits or bytes that are +/// either byte aligned or not. It also has utilities to read multiple bytes in one +/// read (e.g. encoded int). Exposes a batch-oriented interface to allow efficient +/// processing of multiple values at a time. +class BatchedBitReader { +public: + /// 'buffer' is the buffer to read from. The buffer's length is 'buffer_len'. + /// Does not take ownership of the buffer. + BatchedBitReader(const uint8_t* buffer, int64_t buffer_len) { Reset(buffer, buffer_len); } + + BatchedBitReader() {} + + // The implicit copy constructor is left defined. If a BatchedBitReader is copied, the + // two copies do not share any state. Invoking functions on either copy continues + // reading from the current read position without modifying the state of the other + // copy. + + /// Resets the read to start reading from the start of 'buffer'. The buffer's + /// length is 'buffer_len'. Does not take ownership of the buffer. + void Reset(const uint8_t* buffer, int64_t buffer_len) { + DCHECK(buffer != nullptr); + DCHECK_GE(buffer_len, 0); + buffer_pos_ = buffer; + buffer_end_ = buffer + buffer_len; + } + + /// Gets up to 'num_values' bit-packed values, starting from the current byte in the + /// buffer and advance the read position. 'bit_width' must be <= 64. + /// If 'bit_width' * 'num_values' is not a multiple of 8, the trailing bytes are + /// skipped and the next UnpackBatch() call will start reading from the next byte. + /// + /// If the caller does not want to drop trailing bits, 'num_values' must be exactly the + /// total number of values the caller wants to read from a run of bit-packed values, or + /// 'bit_width' * 'num_values' must be a multiple of 8. This condition is always + /// satisfied if 'num_values' is a multiple of 32. + /// + /// The output type 'T' must be an unsigned integer. + /// + /// Returns the number of values read. + template <typename T> + int UnpackBatch(int bit_width, int num_values, T* v); + + /// Skip 'num_values_to_skip' bit-packed values. + /// 'num_values_to_skip * bit_width' is either divisible by 8, or + /// 'num_values_to_skip' equals to the count of the remaining bit-packed values. + bool SkipBatch(int bit_width, int num_values_to_skip); + + /// Unpack bit-packed values in the same way as UnpackBatch() and decode them using the + /// dictionary 'dict' with 'dict_len' entries. Return -1 if a decoding error is + /// encountered, i.e. if the bit-packed values are not valid indices in 'dict'. + /// Otherwise returns the number of values decoded. The values are written to 'v' with + /// a stride of 'stride' bytes. + template <typename T> + int UnpackAndDecodeBatch(int bit_width, T* dict, int64_t dict_len, int num_values, T* v, + int64_t stride); + + /// Reads an unpacked 'num_bytes'-sized value from the buffer and stores it in 'v'. T + /// needs to be a little-endian native type and big enough to store 'num_bytes'. + /// Returns false if there are not enough bytes left. + template <typename T> + bool GetBytes(int num_bytes, T* v); + + /// Read an unsigned ULEB-128 encoded int from the stream. The encoded int must start + /// at the beginning of a byte. Return false if there were not enough bytes in the + /// buffer or the int is invalid. For more details on ULEB-128: + /// https://en.wikipedia.org/wiki/LEB128 + /// UINT_T must be an unsigned integer type. + template <typename UINT_T> + bool GetUleb128(UINT_T* v); + + /// Read a ZigZag encoded int from the stream. The encoded int must start at the + /// beginning of a byte. Return false if there were not enough bytes in the buffer or + /// the int is invalid. For more details on ZigZag encoding: + /// https://developers.google.com/protocol-buffers/docs/encoding#signed-integers + /// INT_T must be a signed integer type. + template <typename INT_T> + bool GetZigZagInteger(INT_T* v); + + /// Returns the number of bytes left in the stream. + int bytes_left() { return buffer_end_ - buffer_pos_; } + + /// Maximum byte length of a vlq encoded integer of type T. + template <typename T> + static constexpr int max_vlq_byte_len() { + return BitUtil::Ceil(sizeof(T) * 8, 7); + } + + /// Maximum supported bitwidth for reader. + static const int MAX_BITWIDTH = BitPacking::MAX_BITWIDTH; + +private: + /// Current read position in the buffer. + const uint8_t* buffer_pos_ = nullptr; + + /// Pointer to the byte after the end of the buffer. + const uint8_t* buffer_end_ = nullptr; +}; + } // namespace doris diff --git a/be/src/util/bit_stream_utils.inline.h b/be/src/util/bit_stream_utils.inline.h index 05a5b8e2f3..c9ba2428ae 100644 --- a/be/src/util/bit_stream_utils.inline.h +++ b/be/src/util/bit_stream_utils.inline.h @@ -24,6 +24,7 @@ #include "glog/logging.h" #include "util/alignment.h" +#include "util/bit_packing.inline.h" #include "util/bit_stream_utils.h" using doris::BitUtil; @@ -208,4 +209,104 @@ inline bool BitReader::GetVlqInt(int32_t* v) { return true; } +template <typename T> +inline int BatchedBitReader::UnpackBatch(int bit_width, int num_values, T* v) { + DCHECK(buffer_pos_ != nullptr); + DCHECK_GE(bit_width, 0); + DCHECK_LE(bit_width, MAX_BITWIDTH); + DCHECK_LE(bit_width, sizeof(T) * 8); + DCHECK_GE(num_values, 0); + + int64_t num_read; + std::tie(buffer_pos_, num_read) = + BitPacking::UnpackValues(bit_width, buffer_pos_, bytes_left(), num_values, v); + DCHECK_LE(buffer_pos_, buffer_end_); + DCHECK_LE(num_read, num_values); + return static_cast<int>(num_read); +} + +inline bool BatchedBitReader::SkipBatch(int bit_width, int num_values_to_skip) { + DCHECK(buffer_pos_ != nullptr); + DCHECK_GT(bit_width, 0); + DCHECK_LE(bit_width, MAX_BITWIDTH); + DCHECK_GT(num_values_to_skip, 0); + + int skip_bytes = BitUtil::RoundUpNumBytes(bit_width * num_values_to_skip); + if (skip_bytes > buffer_end_ - buffer_pos_) return false; + buffer_pos_ += skip_bytes; + return true; +} + +template <typename T> +inline int BatchedBitReader::UnpackAndDecodeBatch(int bit_width, T* dict, int64_t dict_len, + int num_values, T* v, int64_t stride) { + DCHECK(buffer_pos_ != nullptr); + DCHECK_GE(bit_width, 0); + DCHECK_LE(bit_width, MAX_BITWIDTH); + DCHECK_GE(num_values, 0); + + const uint8_t* new_buffer_pos; + int64_t num_read; + bool decode_error = false; + std::tie(new_buffer_pos, num_read) = + BitPacking::UnpackAndDecodeValues(bit_width, buffer_pos_, bytes_left(), dict, dict_len, + num_values, v, stride, &decode_error); + if (UNLIKELY(decode_error)) return -1; + buffer_pos_ = new_buffer_pos; + DCHECK_LE(buffer_pos_, buffer_end_); + DCHECK_LE(num_read, num_values); + return static_cast<int>(num_read); +} + +template <typename T> +inline bool BatchedBitReader::GetBytes(int num_bytes, T* v) { + DCHECK(buffer_pos_ != nullptr); + DCHECK_GE(num_bytes, 0); + DCHECK_LE(num_bytes, sizeof(T)); + if (UNLIKELY(buffer_pos_ + num_bytes > buffer_end_)) return false; + *v = 0; // Ensure unset bytes are initialized to zero. + memcpy(v, buffer_pos_, num_bytes); + buffer_pos_ += num_bytes; + return true; +} + +template <typename UINT_T> +inline bool BatchedBitReader::GetUleb128(UINT_T* v) { + static_assert(std::is_integral<UINT_T>::value, "Integral type required."); + static_assert(std::is_unsigned<UINT_T>::value, "Unsigned type required."); + static_assert(!std::is_same<UINT_T, bool>::value, "Bools are not supported."); + + *v = 0; + int shift = 0; + uint8_t byte = 0; + do { + if (UNLIKELY(shift >= max_vlq_byte_len<UINT_T>() * 7)) return false; + if (!GetBytes(1, &byte)) return false; + + /// We need to convert 'byte' to UINT_T so that the result of the bitwise and + /// operation is at least as long an integer as '*v', otherwise the shift may be too + /// big and lead to undefined behaviour. + const UINT_T byte_as_UINT_T = byte; + *v |= (byte_as_UINT_T & 0x7Fu) << shift; + shift += 7; + } while ((byte & 0x80u) != 0); + return true; +} + +template <typename INT_T> +bool BatchedBitReader::GetZigZagInteger(INT_T* v) { + static_assert(std::is_integral<INT_T>::value, "Integral type required."); + static_assert(std::is_signed<INT_T>::value, "Signed type required."); + + using UINT_T = std::make_unsigned_t<INT_T>; + + UINT_T v_unsigned; + if (UNLIKELY(!GetUleb128<UINT_T>(&v_unsigned))) return false; + + /// Here we rely on implementation defined behaviour in converting UINT_T to INT_T. + *v = (v_unsigned >> 1) ^ -(v_unsigned & 1u); + + return true; +} + } // namespace doris diff --git a/be/src/util/bit_util.h b/be/src/util/bit_util.h index af50da4ef3..28534b139b 100644 --- a/be/src/util/bit_util.h +++ b/be/src/util/bit_util.h @@ -167,6 +167,9 @@ public: // Returns the rounded up to 64 multiple. Used for conversions of bits to i64. static inline uint32_t round_up_numi64(uint32_t bits) { return (bits + 63) >> 6; } + // Returns the rounded up to 32 multiple. Used for conversions of bits to i32. + constexpr static inline uint32_t round_up_numi32(uint32_t bits) { return (bits + 31) >> 5; } + #if __BYTE_ORDER == __LITTLE_ENDIAN // Converts to big endian format (if not already in big endian). static inline int64_t big_endian(int64_t value) { return byte_swap(value); } @@ -313,6 +316,11 @@ public: return (value + (factor - 1)) & ~(factor - 1); } + static inline int64_t RoundDownToPowerOf2(int64_t value, int64_t factor) { + DCHECK((factor > 0) && ((factor & (factor - 1)) == 0)); + return value & ~(factor - 1); + } + // Returns the ceil of value/divisor static inline int Ceil(int value, int divisor) { return value / divisor + (value % divisor != 0); diff --git a/be/src/vec/exec/format/parquet/parquet_common.cpp b/be/src/vec/exec/format/parquet/parquet_common.cpp index ec0c3ce411..1d80676b9b 100644 --- a/be/src/vec/exec/format/parquet/parquet_common.cpp +++ b/be/src/vec/exec/format/parquet/parquet_common.cpp @@ -17,6 +17,8 @@ #include "parquet_common.h" +#include "util/coding.h" + namespace doris::vectorized { Status Decoder::getDecoder(tparquet::Type::type type, tparquet::Encoding::type encoding, @@ -24,6 +26,9 @@ Status Decoder::getDecoder(tparquet::Type::type type, tparquet::Encoding::type e switch (encoding) { case tparquet::Encoding::PLAIN: switch (type) { + case tparquet::Type::BOOLEAN: + decoder.reset(new BoolPlainDecoder()); + break; case tparquet::Type::INT32: decoder.reset(new PlainDecoder<Int32>()); break; @@ -36,6 +41,12 @@ Status Decoder::getDecoder(tparquet::Type::type type, tparquet::Encoding::type e case tparquet::Type::DOUBLE: decoder.reset(new PlainDecoder<Float64>()); break; + case tparquet::Type::BYTE_ARRAY: + decoder.reset(new BAPlainDecoder()); + break; + case tparquet::Type::FIXED_LEN_BYTE_ARRAY: + decoder.reset(new FixedLengthBAPlainDecoder()); + break; default: return Status::InternalError("Unsupported plain type {} in parquet decoder", tparquet::to_string(type)); @@ -49,12 +60,158 @@ Status Decoder::getDecoder(tparquet::Type::type type, tparquet::Encoding::type e return Status::OK(); } -MutableColumnPtr Decoder::getMutableColumnPtr(ColumnPtr& doris_column) { - // src column always be nullable for simple converting +Status Decoder::decode_values(ColumnPtr& doris_column, size_t num_values) { CHECK(doris_column->is_nullable()); auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>( (*std::move(doris_column)).mutate().get()); - return nullable_column->get_nested_column_ptr(); + MutableColumnPtr data_column = nullable_column->get_nested_column_ptr(); + return _decode_values(data_column, num_values); +} + +Status FixedLengthBAPlainDecoder::decode_values(Slice& slice, size_t num_values) { + size_t to_read_bytes = _type_length * num_values; + if (UNLIKELY(_offset + to_read_bytes > _data->size)) { + return Status::IOError("Out-of-bounds access in parquet data decoder"); + } + // insert '\0' into the end of each binary + if (UNLIKELY(to_read_bytes + num_values > slice.size)) { + return Status::IOError("Slice does not have enough space to write out the decoding data"); + } + uint32_t slice_offset = 0; + for (int i = 0; i < num_values; ++i) { + memcpy(slice.data + slice_offset, _data->data + _offset, _type_length); + slice_offset += _type_length + 1; + slice.data[slice_offset - 1] = '\0'; + _offset += _type_length; + } + return Status::OK(); +} + +Status FixedLengthBAPlainDecoder::skip_values(size_t num_values) { + _offset += _type_length * num_values; + if (UNLIKELY(_offset > _data->size)) { + return Status::IOError("Out-of-bounds access in parquet data decoder"); + } + return Status::OK(); } +Status FixedLengthBAPlainDecoder::_decode_values(MutableColumnPtr& doris_column, + size_t num_values) { + if (UNLIKELY(_offset + _type_length * num_values > _data->size)) { + return Status::IOError("Out-of-bounds access in parquet data decoder"); + } + auto& column_chars_t = assert_cast<ColumnString&>(*doris_column).get_chars(); + auto& column_offsets = assert_cast<ColumnString&>(*doris_column).get_offsets(); + for (int i = 0; i < num_values; ++i) { + column_chars_t.insert(_data->data + _offset, _data->data + _offset + _type_length); + column_chars_t.emplace_back('\0'); + column_offsets.emplace_back(column_chars_t.size()); + _offset += _type_length; + } + return Status::OK(); +} + +Status BAPlainDecoder::decode_values(Slice& slice, size_t num_values) { + uint32_t slice_offset = 0; + for (int i = 0; i < num_values; ++i) { + if (UNLIKELY(_offset + 4 > _data->size)) { + return Status::IOError("Can't read byte array length from plain decoder"); + } + uint32_t length = + decode_fixed32_le(reinterpret_cast<const uint8_t*>(_data->data) + _offset); + _offset += 4; + if (UNLIKELY(_offset + length) > _data->size) { + return Status::IOError("Can't read enough bytes in plain decoder"); + } + memcpy(slice.data + slice_offset, _data->data + _offset, length); + slice_offset += length + 1; + slice.data[slice_offset - 1] = '\0'; + _offset += length; + } + return Status::OK(); +} + +Status BAPlainDecoder::skip_values(size_t num_values) { + for (int i = 0; i < num_values; ++i) { + if (UNLIKELY(_offset + 4 > _data->size)) { + return Status::IOError("Can't read byte array length from plain decoder"); + } + uint32_t length = + decode_fixed32_le(reinterpret_cast<const uint8_t*>(_data->data) + _offset); + _offset += 4; + if (UNLIKELY(_offset + length) > _data->size) { + return Status::IOError("Can't skip enough bytes in plain decoder"); + } + _offset += length; + } + return Status::OK(); +} + +Status BAPlainDecoder::_decode_values(MutableColumnPtr& doris_column, size_t num_values) { + auto& column_chars_t = assert_cast<ColumnString&>(*doris_column).get_chars(); + auto& column_offsets = assert_cast<ColumnString&>(*doris_column).get_offsets(); + for (int i = 0; i < num_values; ++i) { + if (UNLIKELY(_offset + 4 > _data->size)) { + return Status::IOError("Can't read byte array length from plain decoder"); + } + uint32_t length = + decode_fixed32_le(reinterpret_cast<const uint8_t*>(_data->data) + _offset); + _offset += 4; + if (UNLIKELY(_offset + length) > _data->size) { + return Status::IOError("Can't read enough bytes in plain decoder"); + } + column_chars_t.insert(_data->data + _offset, _data->data + _offset + length); + column_chars_t.emplace_back('\0'); + column_offsets.emplace_back(column_chars_t.size()); + _offset += length; + } + return Status::OK(); +} + +Status BoolPlainDecoder::decode_values(Slice& slice, size_t num_values) { + bool value; + for (int i = 0; i < num_values; ++i) { + if (UNLIKELY(!_decode_value(&value))) { + return Status::IOError("Can't read enough booleans in plain decoder"); + } + slice.data[i] = value ? 1 : 0; + } + return Status::OK(); +} + +Status BoolPlainDecoder::skip_values(size_t num_values) { + int skip_cached = std::min(num_unpacked_values_ - unpacked_value_idx_, (int)num_values); + unpacked_value_idx_ += skip_cached; + if (skip_cached == num_values) { + return Status::OK(); + } + int num_remaining = num_values - skip_cached; + int num_to_skip = BitUtil::RoundDownToPowerOf2(num_remaining, 32); + if (num_to_skip > 0) { + bool_values_.SkipBatch(1, num_to_skip); + } + num_remaining -= num_to_skip; + if (num_remaining > 0) { + DCHECK_LE(num_remaining, UNPACKED_BUFFER_LEN); + num_unpacked_values_ = + bool_values_.UnpackBatch(1, UNPACKED_BUFFER_LEN, &unpacked_values_[0]); + if (UNLIKELY(num_unpacked_values_ < num_remaining)) { + return Status::IOError("Can't skip enough booleans in plain decoder"); + } + unpacked_value_idx_ = num_remaining; + } + return Status::OK(); +} + +Status BoolPlainDecoder::_decode_values(MutableColumnPtr& doris_column, size_t num_values) { + auto& column_data = static_cast<ColumnVector<UInt8>&>(*doris_column).get_data(); + bool value; + for (int i = 0; i < num_values; ++i) { + if (UNLIKELY(!_decode_value(&value))) { + return Status::IOError("Can't read enough booleans in plain decoder"); + } + column_data.emplace_back(value); + } + return Status::OK(); +} } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/parquet_common.h b/be/src/vec/exec/format/parquet/parquet_common.h index 0f0e9f6e3d..44523ae22d 100644 --- a/be/src/vec/exec/format/parquet/parquet_common.h +++ b/be/src/vec/exec/format/parquet/parquet_common.h @@ -21,8 +21,10 @@ #include "common/status.h" #include "gen_cpp/parquet_types.h" +#include "util/bit_stream_utils.inline.h" #include "vec/columns/column_array.h" #include "vec/columns/column_nullable.h" +#include "vec/columns/column_string.h" namespace doris::vectorized { @@ -36,25 +38,25 @@ public: static Status getDecoder(tparquet::Type::type type, tparquet::Encoding::type encoding, std::unique_ptr<Decoder>& decoder); - static MutableColumnPtr getMutableColumnPtr(ColumnPtr& doris_column); - // The type with fix length void set_type_length(int32_t type_length) { _type_length = type_length; } // Set the data to be decoded - void set_data(Slice* data) { + virtual void set_data(Slice* data) { _data = data; _offset = 0; } // Write the decoded values batch to doris's column - virtual Status decode_values(ColumnPtr& doris_column, size_t num_values) = 0; + Status decode_values(ColumnPtr& doris_column, size_t num_values); virtual Status decode_values(Slice& slice, size_t num_values) = 0; virtual Status skip_values(size_t num_values) = 0; protected: + virtual Status _decode_values(MutableColumnPtr& doris_column, size_t num_values) = 0; + int32_t _type_length; Slice* _data = nullptr; uint32_t _offset = 0; @@ -66,19 +68,6 @@ public: PlainDecoder() = default; ~PlainDecoder() override = default; - Status decode_values(ColumnPtr& doris_column, size_t num_values) override { - size_t to_read_bytes = TYPE_LENGTH * num_values; - if (UNLIKELY(_offset + to_read_bytes > _data->size)) { - return Status::IOError("Out-of-bounds access in parquet data decoder"); - } - auto data_column = getMutableColumnPtr(doris_column); - auto& column_data = static_cast<ColumnVector<T>&>(*data_column).get_data(); - const auto* raw_data = reinterpret_cast<const T*>(_data->data + _offset); - column_data.insert(raw_data, raw_data + num_values); - _offset += to_read_bytes; - return Status::OK(); - } - Status decode_values(Slice& slice, size_t num_values) override { size_t to_read_bytes = TYPE_LENGTH * num_values; if (UNLIKELY(_offset + to_read_bytes > _data->size)) { @@ -103,6 +92,98 @@ public: protected: enum { TYPE_LENGTH = sizeof(T) }; + + Status _decode_values(MutableColumnPtr& doris_column, size_t num_values) override { + size_t to_read_bytes = TYPE_LENGTH * num_values; + if (UNLIKELY(_offset + to_read_bytes > _data->size)) { + return Status::IOError("Out-of-bounds access in parquet data decoder"); + } + auto& column_data = static_cast<ColumnVector<T>&>(*doris_column).get_data(); + const auto* raw_data = reinterpret_cast<const T*>(_data->data + _offset); + column_data.insert(raw_data, raw_data + num_values); + _offset += to_read_bytes; + return Status::OK(); + } +}; + +class FixedLengthBAPlainDecoder final : public Decoder { +public: + FixedLengthBAPlainDecoder() = default; + ~FixedLengthBAPlainDecoder() override = default; + + Status decode_values(Slice& slice, size_t num_values) override; + + Status skip_values(size_t num_values) override; + +protected: + Status _decode_values(MutableColumnPtr& doris_column, size_t num_values) override; +}; + +class BAPlainDecoder final : public Decoder { +public: + BAPlainDecoder() = default; + ~BAPlainDecoder() override = default; + + Status decode_values(Slice& slice, size_t num_values) override; + + Status skip_values(size_t num_values) override; + +protected: + Status _decode_values(MutableColumnPtr& doris_column, size_t num_values) override; +}; + +/// Decoder bit-packed boolean-encoded values. +/// Implementation from https://github.com/apache/impala/blob/master/be/src/exec/parquet/parquet-bool-decoder.h +class BoolPlainDecoder final : public Decoder { +public: + BoolPlainDecoder() = default; + ~BoolPlainDecoder() override = default; + + // Set the data to be decoded + void set_data(Slice* data) override { + bool_values_.Reset((const uint8_t*)data->data, data->size); + num_unpacked_values_ = 0; + unpacked_value_idx_ = 0; + _offset = 0; + } + + Status decode_values(Slice& slice, size_t num_values) override; + + Status skip_values(size_t num_values) override; + +protected: + inline bool _decode_value(bool* value) { + if (LIKELY(unpacked_value_idx_ < num_unpacked_values_)) { + *value = unpacked_values_[unpacked_value_idx_++]; + } else { + num_unpacked_values_ = + bool_values_.UnpackBatch(1, UNPACKED_BUFFER_LEN, &unpacked_values_[0]); + if (UNLIKELY(num_unpacked_values_ == 0)) { + return false; + } + *value = unpacked_values_[0]; + unpacked_value_idx_ = 1; + } + return true; + } + + Status _decode_values(MutableColumnPtr& doris_column, size_t num_values) override; + + /// A buffer to store unpacked values. Must be a multiple of 32 size to use the + /// batch-oriented interface of BatchedBitReader. We use uint8_t instead of bool because + /// bit unpacking is only supported for unsigned integers. The values are converted to + /// bool when returned to the user. + static const int UNPACKED_BUFFER_LEN = 128; + uint8_t unpacked_values_[UNPACKED_BUFFER_LEN]; + + /// The number of valid values in 'unpacked_values_'. + int num_unpacked_values_ = 0; + + /// The next value to return from 'unpacked_values_'. + int unpacked_value_idx_ = 0; + + /// Bit packed decoder, used if 'encoding_' is PLAIN. + BatchedBitReader bool_values_; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp index 11d6bfaf94..d4c7f534a3 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp @@ -86,7 +86,15 @@ Status ColumnChunkReader::load_page_data() { if (encoding == tparquet::Encoding::PLAIN_DICTIONARY) { encoding = tparquet::Encoding::RLE_DICTIONARY; } - Decoder::getDecoder(_metadata.type, encoding, _page_decoder); + // Reuse page decoder + if (_decoders.find(static_cast<int>(encoding)) != _decoders.end()) { + _page_decoder = _decoders[static_cast<int>(encoding)].get(); + } else { + std::unique_ptr<Decoder> page_decoder; + Decoder::getDecoder(_metadata.type, encoding, page_decoder); + _decoders[static_cast<int>(encoding)] = std::move(page_decoder); + _page_decoder = _decoders[static_cast<int>(encoding)].get(); + } _page_decoder->set_data(&_page_data); if (_type_length > 0) { _page_decoder->set_type_length(_type_length); @@ -122,12 +130,12 @@ Status ColumnChunkReader::skip_values(size_t num_values) { size_t ColumnChunkReader::get_rep_levels(level_t* levels, size_t n) { DCHECK_GT(_max_rep_level, 0); - return _def_level_decoder.get_levels(levels, n); + return _rep_level_decoder.get_levels(levels, n); } size_t ColumnChunkReader::get_def_levels(level_t* levels, size_t n) { DCHECK_GT(_max_def_level, 0); - return _rep_level_decoder.get_levels(levels, n); + return _def_level_decoder.get_levels(levels, n); } Status ColumnChunkReader::decode_values(ColumnPtr& doris_column, size_t num_values) { diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h index c3b58be5c0..282612bd21 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h @@ -127,7 +127,10 @@ private: Slice _page_data; std::unique_ptr<uint8_t[]> _decompress_buf; size_t _decompress_buf_size = 0; - std::unique_ptr<Decoder> _page_decoder = nullptr; + Decoder* _page_decoder = nullptr; + // Map: encoding -> Decoder + // Plain or Dictionary encoding. If the dictionary grows too big, the encoding will fall back to the plain encoding + std::unordered_map<int, std::unique_ptr<Decoder>> _decoders; size_t _type_length = -1; }; diff --git a/be/test/exec/test_data/parquet_scanner/type-decoder.parquet b/be/test/exec/test_data/parquet_scanner/type-decoder.parquet index e4b5820161..5f679c0005 100644 Binary files a/be/test/exec/test_data/parquet_scanner/type-decoder.parquet and b/be/test/exec/test_data/parquet_scanner/type-decoder.parquet differ diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp b/be/test/vec/exec/parquet/parquet_thrift_test.cpp index 02bcef6261..db91103c88 100644 --- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp +++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp @@ -124,58 +124,147 @@ TEST_F(ParquetThriftReaderTest, complex_nested_file) { ASSERT_EQ(schemaDescriptor.get_column_index("mark"), 4); } -TEST_F(ParquetThriftReaderTest, column_reader) { - // type-decoder.parquet is the part of following table: - // create table type-decoder ( - // int_col int) - // TODO(gaoxin): add more hive types +static Status get_column_values(FileReader* file_reader, tparquet::ColumnChunk* column_chunk, + FieldSchema* field_schema, Slice& slice) { + tparquet::ColumnMetaData chunk_meta = column_chunk->meta_data; + size_t start_offset = chunk_meta.__isset.dictionary_page_offset + ? chunk_meta.dictionary_page_offset + : chunk_meta.data_page_offset; + size_t chunk_size = chunk_meta.total_compressed_size; + BufferedFileStreamReader stream_reader(file_reader, start_offset, chunk_size); + + ColumnChunkReader chunk_reader(&stream_reader, column_chunk, field_schema); + // initialize chunk reader + chunk_reader.init(); + // seek to next page header + chunk_reader.next_page(); + // load page data into underlying container + chunk_reader.load_page_data(); + // decode page data + return chunk_reader.decode_values(slice, chunk_reader.num_values()); +} + +TEST_F(ParquetThriftReaderTest, type_decoder) { + /* + * type-decoder.parquet is the part of following table: + * create table `type_decoder`( + * `tinyint_col` tinyint, // 0 + * `smallint_col` smallint, // 1 + * `int_col` int, // 2 + * `bigint_col` bigint, // 3 + * `boolean_col` boolean, // 4 + * `float_col` float, // 5 + * `double_col` double, // 6 + * `string_col` string, // 7 + * `binary_col` binary, // 8 + * `timestamp_col` timestamp, // 9 + * `decimal_col` decimal(10,2), // 10 + * `char_col` char(10), // 11 + * `varchar_col` varchar(50), // 12 + * `date_col` date, // 13 + * `list_string` array<string>) // 14 + */ LocalFileReader reader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", 0); + /* + * Data in type-decoder.parquet: + * -1 -1 -1 -1 false -1.14 -1.14 s-row0 b-row0 2022-08-01 00:00:00 -1.14 c-row0 vc-row0 2022-08-01 ["as-0","as-1"] + * 2 2 2 2 true 2.14 2.14 NULL b-row1 2022-08-02 00:00:00 2.14 c-row1 vc-row1 2022-08-02 [null,"as-3"] + * -3 -3 -3 -3 false -3.14 -3.14 s-row2 b-row2 2022-08-03 00:00:00 -3.14 c-row2 vc-row2 2022-08-03 [] + * 4 4 4 4 true 4.14 4.14 NULL b-row3 2022-08-04 00:00:00 4.14 c-row3 vc-row3 2022-08-04 ["as-4"] + * -5 -5 -5 -5 false -5.14 -5.14 s-row4 b-row4 2022-08-05 00:00:00 -5.14 c-row4 vc-row4 2022-08-05 ["as-5",null] + * 6 6 6 6 false 6.14 6.14 s-row5 b-row5 2022-08-06 00:00:00 6.14 c-row5 vc-row5 2022-08-06 [null,null] + * -7 -7 -7 -7 true -7.14 -7.14 s-row6 b-row6 2022-08-07 00:00:00 -7.14 c-row6 vc-row6 2022-08-07 ["as-6","as-7"] + * 8 8 8 8 false 8.14 8.14 NULL b-row7 2022-08-08 00:00:00 8.14 c-row7 vc-row7 2022-08-08 ["as-0","as-8"] + * -9 -9 -9 -9 false -9.14 -9.14 s-row8 b-row8 2022-08-09 00:00:00 -9.14 c-row8 vc-row8 2022-08-09 ["as-9","as-10"] + * 10 10 10 10 false 10.14 10.14 s-row9 b-row9 2022-08-10 00:00:00 10.14 c-row9 vc-row9 2022-08-10 ["as-11","as-12"] + */ auto st = reader.open(); EXPECT_TRUE(st.ok()); std::shared_ptr<FileMetaData> metaData; parse_thrift_footer(&reader, metaData); tparquet::FileMetaData t_metadata = metaData->to_thrift_metadata(); - - // read the `int_col` column, it's the int-type column, and has ten values: - // -1, 2, -3, 4, -5, 6, -7, 8, -9, 10 - tparquet::ColumnChunk column_chunk = t_metadata.row_groups[0].columns[0]; - tparquet::ColumnMetaData chunk_meta = column_chunk.meta_data; - size_t start_offset = chunk_meta.__isset.dictionary_page_offset - ? chunk_meta.dictionary_page_offset - : chunk_meta.data_page_offset; - size_t chunk_size = chunk_meta.total_compressed_size; - BufferedFileStreamReader stream_reader(&reader, start_offset, chunk_size); - FieldDescriptor schema_descriptor; schema_descriptor.parse_from_thrift(t_metadata.schema); - auto field_schema = const_cast<FieldSchema*>(schema_descriptor.get_column(0)); + int rows = 10; - ColumnChunkReader chunk_reader(&stream_reader, &column_chunk, field_schema); - size_t batch_size = 10; - size_t int_length = 4; - char data[batch_size * int_length]; - Slice slice(data, batch_size * int_length); - chunk_reader.init(); - uint64_t int_sum = 0; - while (chunk_reader.has_next_page()) { + // the physical_type of tinyint_col, smallint_col and int_col are all INT32 + // they are distinguished by converted_type(in FieldSchema.parquet_schema.converted_type) + for (int col_idx = 0; col_idx < 3; ++col_idx) { + char data[4 * rows]; + Slice slice(data, 4 * rows); + get_column_values(&reader, &t_metadata.row_groups[0].columns[col_idx], + const_cast<FieldSchema*>(schema_descriptor.get_column(col_idx)), slice); + auto out_data = reinterpret_cast<int32_t*>(data); + int int_sum = 0; + for (int i = 0; i < rows; ++i) { + int_sum += out_data[i]; + } + ASSERT_EQ(int_sum, 5); + } + // `bigint_col` bigint, // 3 + { + char data[8 * rows]; + Slice slice(data, 8 * rows); + get_column_values(&reader, &t_metadata.row_groups[0].columns[3], + const_cast<FieldSchema*>(schema_descriptor.get_column(3)), slice); + auto out_data = reinterpret_cast<int64_t*>(data); + int int_sum = 0; + for (int i = 0; i < rows; ++i) { + int_sum += out_data[i]; + } + ASSERT_EQ(int_sum, 5); + } + // `boolean_col` boolean, // 4 + { + char data[1 * rows]; + Slice slice(data, 1 * rows); + get_column_values(&reader, &t_metadata.row_groups[0].columns[4], + const_cast<FieldSchema*>(schema_descriptor.get_column(4)), slice); + auto out_data = reinterpret_cast<bool*>(data); + ASSERT_FALSE(out_data[0]); + ASSERT_TRUE(out_data[1]); + ASSERT_FALSE(out_data[2]); + ASSERT_TRUE(out_data[3]); + ASSERT_FALSE(out_data[4]); + ASSERT_FALSE(out_data[5]); + ASSERT_TRUE(out_data[6]); + ASSERT_FALSE(out_data[7]); + ASSERT_FALSE(out_data[8]); + ASSERT_FALSE(out_data[9]); + } + // `string_col` string, // 7 + { + tparquet::ColumnChunk column_chunk = t_metadata.row_groups[0].columns[7]; + tparquet::ColumnMetaData chunk_meta = column_chunk.meta_data; + size_t start_offset = chunk_meta.__isset.dictionary_page_offset + ? chunk_meta.dictionary_page_offset + : chunk_meta.data_page_offset; + size_t chunk_size = chunk_meta.total_compressed_size; + BufferedFileStreamReader stream_reader(&reader, start_offset, chunk_size); + + ColumnChunkReader chunk_reader(&stream_reader, &column_chunk, + const_cast<FieldSchema*>(schema_descriptor.get_column(7))); + // initialize chunk reader + chunk_reader.init(); // seek to next page header chunk_reader.next_page(); - // load data to decoder + // load page data into underlying container chunk_reader.load_page_data(); - while (chunk_reader.num_values() > 0) { - size_t num_values = chunk_reader.num_values() < batch_size - ? chunk_reader.num_values() < batch_size - : batch_size; - chunk_reader.decode_values(slice, num_values); - auto out_data = reinterpret_cast<Int32*>(slice.data); - for (int i = 0; i < num_values; i++) { - Int32 value = out_data[i]; - int_sum += value; - } - } + + char data[50 * rows]; + Slice slice(data, 50 * rows); + level_t defs[rows]; + // Analyze null string + chunk_reader.get_def_levels(defs, rows); + ASSERT_EQ(defs[1], 0); + ASSERT_EQ(defs[3], 0); + ASSERT_EQ(defs[7], 0); + + chunk_reader.decode_values(slice, 7); + ASSERT_STREQ("s-row0", slice.data); + ASSERT_STREQ("s-row2", slice.data + 7); } - ASSERT_EQ(int_sum, 5); } } // namespace vectorized --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org