This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 0b5bb565a7 [feature-wip](parquet-reader) parquet dictionary decoder (#11981) 0b5bb565a7 is described below commit 0b5bb565a7497463e79e1efce7f6739e48b1deba Author: Ashin Gau <ashin...@users.noreply.github.com> AuthorDate: Fri Aug 26 19:24:37 2022 +0800 [feature-wip](parquet-reader) parquet dictionary decoder (#11981) Parse parquet data with dictionary encoding. Using the PLAIN_DICTIONARY enum value is deprecated in the Parquet 2.0 specification. Prefer using RLE_DICTIONARY in a data page and PLAIN in a dictionary page for Parquet 2.0+ files. refer: https://github.com/apache/parquet-format/blob/master/Encodings.md --- be/src/util/rle_encoding.h | 291 +++++++++++++++++++ be/src/vec/exec/format/parquet/parquet_common.cpp | 176 +++++++----- be/src/vec/exec/format/parquet/parquet_common.h | 151 ++++++---- .../parquet/vparquet_column_chunk_reader.cpp | 56 +++- .../format/parquet/vparquet_column_chunk_reader.h | 4 +- .../test_data/parquet_scanner/dict-decoder.parquet | Bin 0 -> 4282 bytes .../test_data/parquet_scanner/dict-decoder.txt | 16 ++ .../test_data/parquet_scanner/type-decoder.txt | 14 + be/test/vec/exec/parquet/parquet_thrift_test.cpp | 315 ++++++--------------- 9 files changed, 657 insertions(+), 366 deletions(-) diff --git a/be/src/util/rle_encoding.h b/be/src/util/rle_encoding.h index 08a7a23a4d..1409473a09 100644 --- a/be/src/util/rle_encoding.h +++ b/be/src/util/rle_encoding.h @@ -568,4 +568,295 @@ inline void RleEncoder<T>::Clear() { bit_writer_.Clear(); } +// Copy from https://github.com/apache/impala/blob/master/be/src/util/rle-encoding.h +// Utility classes to do run length encoding (RLE) for fixed bit width values. If runs +// are sufficiently long, RLE is used, otherwise, the values are just bit-packed +// (literal encoding). +// +// For both types of runs, there is a byte-aligned indicator which encodes the length +// of the run and the type of the run. +// +// This encoding has the benefit that when there aren't any long enough runs, values +// are always decoded at fixed (can be precomputed) bit offsets OR both the value and +// the run length are byte aligned. This allows for very efficient decoding +// implementations. +// The encoding is: +// encoded-block := run* +// run := literal-run | repeated-run +// literal-run := literal-indicator < literal bytes > +// repeated-run := repeated-indicator < repeated value. padded to byte boundary > +// literal-indicator := varint_encode( number_of_groups << 1 | 1) +// repeated-indicator := varint_encode( number_of_repetitions << 1 ) +// +// Each run is preceded by a varint. The varint's least significant bit is +// used to indicate whether the run is a literal run or a repeated run. The rest +// of the varint is used to determine the length of the run (eg how many times the +// value repeats). +// +// In the case of literal runs, the run length is always a multiple of 8 (i.e. encode +// in groups of 8), so that no matter the bit-width of the value, the sequence will end +// on a byte boundary without padding. +// Given that we know it is a multiple of 8, we store the number of 8-groups rather than +// the actual number of encoded ints. (This means that the total number of encoded values +// can not be determined from the encoded data, since the number of values in the last +// group may not be a multiple of 8). For the last group of literal runs, we pad +// the group to 8 with zeros. This allows for 8 at a time decoding on the read side +// without the need for additional checks. +// +// There is a break-even point when it is more storage efficient to do run length +// encoding. For 1 bit-width values, that point is 8 values. They require 2 bytes +// for both the repeated encoding or the literal encoding. This value can always +// be computed based on the bit-width. +// TODO: For 1 bit-width values it can be optimal to use 16 or 24 values, but more +// investigation is needed to do this efficiently, see the reverted IMPALA-6658. +// TODO: think about how to use this for strings. The bit packing isn't quite the same. +// +// Examples with bit-width 1 (eg encoding booleans): +// ---------------------------------------- +// 100 1s followed by 100 0s: +// <varint(100 << 1)> <1, padded to 1 byte> <varint(100 << 1)> <0, padded to 1 byte> +// - (total 4 bytes) +// +// alternating 1s and 0s (200 total): +// 200 ints = 25 groups of 8 +// <varint((25 << 1) | 1)> <25 bytes of values, bitpacked> +// (total 26 bytes, 1 byte overhead) + +// RLE decoder with a batch-oriented interface that enables fast decoding. +// Users of this class must first initialize the class to point to a buffer of +// RLE-encoded data, passed into the constructor or Reset(). The provided +// bit_width must be at most min(sizeof(T) * 8, BatchedBitReader::MAX_BITWIDTH). +// Then they can decode data by checking NextNumRepeats()/NextNumLiterals() to +// see if the next run is a repeated or literal run, then calling +// GetRepeatedValue() or GetLiteralValues() respectively to read the values. +// +// End-of-input is signalled by NextNumRepeats() == NextNumLiterals() == 0. +// Other decoding errors are signalled by functions returning false. If an +// error is encountered then it is not valid to read any more data until +// Reset() is called. + +template <typename T> +class RleBatchDecoder { +public: + RleBatchDecoder(uint8_t* buffer, int buffer_len, int bit_width) { + Reset(buffer, buffer_len, bit_width); + } + + RleBatchDecoder() = default; + + // Reset the decoder to read from a new buffer. + void Reset(uint8_t* buffer, int buffer_len, int bit_width); + + // Return the size of the current repeated run. Returns zero if the current run is + // a literal run or if no more runs can be read from the input. + int32_t NextNumRepeats(); + + // Get the value of the current repeated run and consume the given number of repeats. + // Only valid to call when NextNumRepeats() > 0. The given number of repeats cannot + // be greater than the remaining number of repeats in the run. 'num_repeats_to_consume' + // can be set to 0 to peek at the value without consuming repeats. + T GetRepeatedValue(int32_t num_repeats_to_consume); + + // Return the size of the current literal run. Returns zero if the current run is + // a repeated run or if no more runs can be read from the input. + int32_t NextNumLiterals(); + + // Consume 'num_literals_to_consume' literals from the current literal run, + // copying the values to 'values'. 'num_literals_to_consume' must be <= + // NextNumLiterals(). Returns true if the requested number of literals were + // successfully read or false if an error was encountered, e.g. the input was + // truncated. + bool GetLiteralValues(int32_t num_literals_to_consume, T* values) WARN_UNUSED_RESULT; + + // Consume 'num_values_to_consume' values and copy them to 'values'. + // Returns the number of consumed values or 0 if an error occurred. + int32_t GetBatch(T* values, int32_t batch_num); + +private: + // Called when both 'literal_count_' and 'repeat_count_' have been exhausted. + // Sets either 'literal_count_' or 'repeat_count_' to the size of the next literal + // or repeated run, or leaves both at 0 if no more values can be read (either because + // the end of the input was reached or an error was encountered decoding). + void NextCounts(); + + /// Fill the literal buffer. Invalid to call if there are already buffered literals. + /// Return false if the input was truncated. This does not advance 'literal_count_'. + bool FillLiteralBuffer() WARN_UNUSED_RESULT; + + bool HaveBufferedLiterals() const { return literal_buffer_pos_ < num_buffered_literals_; } + + /// Output buffered literals, advancing 'literal_buffer_pos_' and decrementing + /// 'literal_count_'. Returns the number of literals outputted. + int32_t OutputBufferedLiterals(int32_t max_to_output, T* values); + + BatchedBitReader bit_reader_; + + // Number of bits needed to encode the value. Must be between 0 and 64 after + // the decoder is initialized with a buffer. -1 indicates the decoder was not + // initialized. + int bit_width_ = -1; + + // If a repeated run, the number of repeats remaining in the current run to be read. + // If the current run is a literal run, this is 0. + int32_t repeat_count_ = 0; + + // If a literal run, the number of literals remaining in the current run to be read. + // If the current run is a repeated run, this is 0. + int32_t literal_count_ = 0; + + // If a repeated run, the current repeated value. + T repeated_value_; + + // Size of buffer for literal values. Large enough to decode a full batch of 32 + // literals. The buffer is needed to allow clients to read in batches that are not + // multiples of 32. + static constexpr int LITERAL_BUFFER_LEN = 32; + + // Buffer containing 'num_buffered_literals_' values. 'literal_buffer_pos_' is the + // position of the next literal to be read from the buffer. + T literal_buffer_[LITERAL_BUFFER_LEN]; + int num_buffered_literals_ = 0; + int literal_buffer_pos_ = 0; +}; + +template <typename T> +inline int32_t RleBatchDecoder<T>::OutputBufferedLiterals(int32_t max_to_output, T* values) { + int32_t num_to_output = + std::min<int32_t>(max_to_output, num_buffered_literals_ - literal_buffer_pos_); + memcpy(values, &literal_buffer_[literal_buffer_pos_], sizeof(T) * num_to_output); + literal_buffer_pos_ += num_to_output; + literal_count_ -= num_to_output; + return num_to_output; +} + +template <typename T> +inline void RleBatchDecoder<T>::Reset(uint8_t* buffer, int buffer_len, int bit_width) { + bit_reader_.Reset(buffer, buffer_len); + bit_width_ = bit_width; + repeat_count_ = 0; + literal_count_ = 0; + num_buffered_literals_ = 0; + literal_buffer_pos_ = 0; +} + +template <typename T> +inline int32_t RleBatchDecoder<T>::NextNumRepeats() { + if (repeat_count_ > 0) return repeat_count_; + if (literal_count_ == 0) NextCounts(); + return repeat_count_; +} + +template <typename T> +inline void RleBatchDecoder<T>::NextCounts() { + // Read the next run's indicator int, it could be a literal or repeated run. + // The int is encoded as a ULEB128-encoded value. + uint32_t indicator_value = 0; + if (UNLIKELY(!bit_reader_.GetUleb128<uint32_t>(&indicator_value))) { + return; + } + + // lsb indicates if it is a literal run or repeated run + bool is_literal = indicator_value & 1; + + // Don't try to handle run lengths that don't fit in an int32_t - just fail gracefully. + // The Parquet standard does not allow longer runs - see PARQUET-1290. + uint32_t run_len = indicator_value >> 1; + if (is_literal) { + // Use int64_t to avoid overflowing multiplication. + int64_t literal_count = static_cast<int64_t>(run_len) * 8; + if (UNLIKELY(literal_count > std::numeric_limits<int32_t>::max())) return; + literal_count_ = literal_count; + } else { + if (UNLIKELY(run_len == 0)) return; + bool result = bit_reader_.GetBytes<T>(BitUtil::Ceil(bit_width_, 8), &repeated_value_); + if (UNLIKELY(!result)) return; + repeat_count_ = run_len; + } +} + +template <typename T> +inline T RleBatchDecoder<T>::GetRepeatedValue(int32_t num_repeats_to_consume) { + repeat_count_ -= num_repeats_to_consume; + return repeated_value_; +} + +template <typename T> +inline int32_t RleBatchDecoder<T>::NextNumLiterals() { + if (literal_count_ > 0) return literal_count_; + if (repeat_count_ == 0) NextCounts(); + return literal_count_; +} + +template <typename T> +inline bool RleBatchDecoder<T>::GetLiteralValues(int32_t num_literals_to_consume, T* values) { + int32_t num_consumed = 0; + // Copy any buffered literals left over from previous calls. + if (HaveBufferedLiterals()) { + num_consumed = OutputBufferedLiterals(num_literals_to_consume, values); + } + + int32_t num_remaining = num_literals_to_consume - num_consumed; + // Copy literals directly to the output, bypassing 'literal_buffer_' when possible. + // Need to round to a batch of 32 if the caller is consuming only part of the current + // run avoid ending on a non-byte boundary. + int32_t num_to_bypass = + std::min<int32_t>(literal_count_, BitUtil::RoundDownToPowerOf2(num_remaining, 32)); + if (num_to_bypass > 0) { + int num_read = bit_reader_.UnpackBatch(bit_width_, num_to_bypass, values + num_consumed); + // If we couldn't read the expected number, that means the input was truncated. + if (num_read < num_to_bypass) return false; + literal_count_ -= num_to_bypass; + num_consumed += num_to_bypass; + num_remaining = num_literals_to_consume - num_consumed; + } + + if (num_remaining > 0) { + // We weren't able to copy all the literals requested directly from the input. + // Buffer literals and copy over the requested number. + if (UNLIKELY(!FillLiteralBuffer())) return false; + OutputBufferedLiterals(num_remaining, values + num_consumed); + } + return true; +} + +template <typename T> +inline bool RleBatchDecoder<T>::FillLiteralBuffer() { + int32_t num_to_buffer = std::min<int32_t>(LITERAL_BUFFER_LEN, literal_count_); + num_buffered_literals_ = bit_reader_.UnpackBatch(bit_width_, num_to_buffer, literal_buffer_); + // If we couldn't read the expected number, that means the input was truncated. + if (UNLIKELY(num_buffered_literals_ < num_to_buffer)) return false; + literal_buffer_pos_ = 0; + return true; +} + +template <typename T> +inline int32_t RleBatchDecoder<T>::GetBatch(T* values, int32_t batch_num) { + int32_t num_consumed = 0; + while (num_consumed < batch_num) { + // Add RLE encoded values by repeating the current value this number of times. + int32_t num_repeats = NextNumRepeats(); + if (num_repeats > 0) { + int32_t num_repeats_to_set = std::min(num_repeats, batch_num - num_consumed); + T repeated_value = GetRepeatedValue(num_repeats_to_set); + for (int i = 0; i < num_repeats_to_set; ++i) { + values[num_consumed + i] = repeated_value; + } + num_consumed += num_repeats_to_set; + continue; + } + + // Add remaining literal values, if any. + int32_t num_literals = NextNumLiterals(); + if (num_literals == 0) { + break; + } + int32_t num_literals_to_set = std::min(num_literals, batch_num - num_consumed); + if (!GetLiteralValues(num_literals_to_set, values + num_consumed)) { + return 0; + } + num_consumed += num_literals_to_set; + } + return num_consumed; +} + } // namespace doris diff --git a/be/src/vec/exec/format/parquet/parquet_common.cpp b/be/src/vec/exec/format/parquet/parquet_common.cpp index be4ec35223..347db41d86 100644 --- a/be/src/vec/exec/format/parquet/parquet_common.cpp +++ b/be/src/vec/exec/format/parquet/parquet_common.cpp @@ -44,12 +44,16 @@ Status Decoder::get_decoder(tparquet::Type::type type, tparquet::Encoding::type std::unique_ptr<Decoder>& decoder) { switch (encoding) { case tparquet::Encoding::PLAIN: + case tparquet::Encoding::RLE_DICTIONARY: switch (type) { case tparquet::Type::BOOLEAN: + if (encoding != tparquet::Encoding::PLAIN) { + return Status::InternalError("Bool type can't has dictionary page"); + } decoder.reset(new BoolPlainDecoder()); break; case tparquet::Type::BYTE_ARRAY: - decoder.reset(new ByteArrayPlainDecoder()); + decoder.reset(new ByteArrayDecoder()); break; case tparquet::Type::INT32: case tparquet::Type::INT64: @@ -57,14 +61,12 @@ Status Decoder::get_decoder(tparquet::Type::type type, tparquet::Encoding::type case tparquet::Type::FLOAT: case tparquet::Type::DOUBLE: case tparquet::Type::FIXED_LEN_BYTE_ARRAY: - decoder.reset(new PlainDecoder(type)); + decoder.reset(new FixLengthDecoder(type)); break; default: - return Status::InternalError("Unsupported plain type {} in parquet decoder", + return Status::InternalError("Unsupported type {} in parquet decoder", tparquet::to_string(type)); } - case tparquet::Encoding::RLE_DICTIONARY: - break; default: return Status::InternalError("Unsupported encoding {} in parquet decoder", tparquet::to_string(encoding)); @@ -118,39 +120,55 @@ Status Decoder::decode_values(ColumnPtr& doris_column, DataTypePtr& data_type, s return decode_values(data_column, data_type, num_values); } -Status PlainDecoder::decode_values(Slice& slice, size_t num_values) { - size_t to_read_bytes = _type_length * num_values; - if (UNLIKELY(to_read_bytes > slice.size)) { - return Status::IOError("Slice does not have enough space to write out the decoding data"); - } - memcpy(slice.data, _data->data + _offset, to_read_bytes); - _offset += to_read_bytes; +Status FixLengthDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, size_t dict_size) { + _has_dict = true; + _dict = std::move(dict); return Status::OK(); } -Status PlainDecoder::skip_values(size_t num_values) { - _offset += _type_length * num_values; - if (UNLIKELY(_offset > _data->size)) { - return Status::IOError("Out-of-bounds access in parquet data decoder"); +void FixLengthDecoder::set_data(Slice* data) { + _data = data; + _offset = 0; + if (_has_dict) { + uint8_t bit_width = *data->data; + _index_batch_decoder.reset( + new RleBatchDecoder<uint32_t>(reinterpret_cast<uint8_t*>(data->data) + 1, + static_cast<int>(data->size) - 1, bit_width)); + } +} + +Status FixLengthDecoder::skip_values(size_t num_values) { + if (_has_dict) { + _indexes.resize(num_values); + _index_batch_decoder->GetBatch(&_indexes[0], num_values); + } else { + _offset += _type_length * num_values; + if (UNLIKELY(_offset > _data->size)) { + return Status::IOError("Out-of-bounds access in parquet data decoder"); + } } return Status::OK(); } -Status PlainDecoder::_decode_short_int(MutableColumnPtr& doris_column, size_t num_values, - size_t real_length) { +Status FixLengthDecoder::_decode_short_int(MutableColumnPtr& doris_column, size_t num_values, + size_t real_length) { if (UNLIKELY(_physical_type != tparquet::Type::INT32)) { return Status::InternalError("Short int can only be decoded from INT32"); } for (int i = 0; i < num_values; ++i) { - doris_column->insert_data(_data->data + _offset, real_length); - _offset += _type_length; + char* buf_start = _FIXED_GET_DATA_OFFSET(i); + doris_column->insert_data(buf_start, real_length); + _FIXED_SHIFT_DATA_OFFSET(); } return Status::OK(); } -Status PlainDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, - size_t num_values) { - if (UNLIKELY(_offset + _type_length * num_values > _data->size)) { +Status FixLengthDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + size_t num_values) { + if (_has_dict) { + _indexes.resize(num_values); + _index_batch_decoder->GetBatch(&_indexes[0], num_values); + } else if (UNLIKELY(_offset + _type_length * num_values > _data->size)) { return Status::IOError("Out-of-bounds access in parquet data decoder"); } TypeIndex logical_type = remove_nullable(data_type)->get_type_id(); @@ -228,8 +246,9 @@ Status PlainDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& case TypeIndex::FixedString: if (_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) { for (int i = 0; i < num_values; ++i) { - doris_column->insert_data(_data->data + _offset, _type_length); - _offset += _type_length; + char* buf_start = _FIXED_GET_DATA_OFFSET(i); + doris_column->insert_data(buf_start, _type_length); + _FIXED_SHIFT_DATA_OFFSET(); } return Status::OK(); } @@ -243,48 +262,37 @@ Status PlainDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& getTypeName(data_type->get_type_id())); } -Status ByteArrayPlainDecoder::decode_values(Slice& slice, size_t num_values) { - uint32_t slice_offset = 0; - for (int i = 0; i < num_values; ++i) { - if (UNLIKELY(_offset + 4 > _data->size)) { - return Status::IOError("Can't read byte array length from plain decoder"); - } - uint32_t length = - decode_fixed32_le(reinterpret_cast<const uint8_t*>(_data->data) + _offset); - _offset += 4; - if (UNLIKELY(_offset + length) > _data->size) { - return Status::IOError("Can't read enough bytes in plain decoder"); - } - memcpy(slice.data + slice_offset, _data->data + _offset, length); - slice_offset += length + 1; - slice.data[slice_offset - 1] = '\0'; - _offset += length; +Status ByteArrayDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, size_t dict_size) { + _has_dict = true; + _dict = std::move(dict); + _dict_offsets.resize(dict_size + 1); + uint32_t offset_cursor = 0; + for (int i = 0; i < dict_size; ++i) { + uint32_t length = decode_fixed32_le(_dict.get() + offset_cursor); + offset_cursor += 4; + _dict_offsets[i] = offset_cursor; + offset_cursor += length; } + _dict_offsets[dict_size] = offset_cursor + 4; return Status::OK(); } -Status ByteArrayPlainDecoder::skip_values(size_t num_values) { - for (int i = 0; i < num_values; ++i) { - if (UNLIKELY(_offset + 4 > _data->size)) { - return Status::IOError("Can't read byte array length from plain decoder"); - } - uint32_t length = - decode_fixed32_le(reinterpret_cast<const uint8_t*>(_data->data) + _offset); - _offset += 4; - if (UNLIKELY(_offset + length) > _data->size) { - return Status::IOError("Can't skip enough bytes in plain decoder"); - } - _offset += length; +void ByteArrayDecoder::set_data(Slice* data) { + _data = data; + _offset = 0; + if (_has_dict) { + uint8_t bit_width = *data->data; + _index_batch_decoder.reset( + new RleBatchDecoder<uint32_t>(reinterpret_cast<uint8_t*>(data->data) + 1, + static_cast<int>(data->size) - 1, bit_width)); } - return Status::OK(); } -Status ByteArrayPlainDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, - size_t num_values) { - TypeIndex logical_type = remove_nullable(data_type)->get_type_id(); - switch (logical_type) { - case TypeIndex::String: - case TypeIndex::FixedString: +Status ByteArrayDecoder::skip_values(size_t num_values) { + if (_has_dict) { + _indexes.resize(num_values); + _index_batch_decoder->GetBatch(&_indexes[0], num_values); + } else { for (int i = 0; i < num_values; ++i) { if (UNLIKELY(_offset + 4 > _data->size)) { return Status::IOError("Can't read byte array length from plain decoder"); @@ -293,11 +301,44 @@ Status ByteArrayPlainDecoder::decode_values(MutableColumnPtr& doris_column, Data decode_fixed32_le(reinterpret_cast<const uint8_t*>(_data->data) + _offset); _offset += 4; if (UNLIKELY(_offset + length) > _data->size) { - return Status::IOError("Can't read enough bytes in plain decoder"); + return Status::IOError("Can't skip enough bytes in plain decoder"); } - doris_column->insert_data(_data->data + _offset, length); _offset += length; } + } + return Status::OK(); +} + +Status ByteArrayDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + size_t num_values) { + if (_has_dict) { + _indexes.resize(num_values); + _index_batch_decoder->GetBatch(&_indexes[0], num_values); + } + TypeIndex logical_type = remove_nullable(data_type)->get_type_id(); + switch (logical_type) { + case TypeIndex::String: + case TypeIndex::FixedString: + for (int i = 0; i < num_values; ++i) { + if (_has_dict) { + uint32_t idx = _indexes[i]; + uint32_t idx_cursor = _dict_offsets[idx]; + char* buff_start = reinterpret_cast<char*>(_dict.get() + idx_cursor); + doris_column->insert_data(buff_start, _dict_offsets[idx + 1] - idx_cursor - 4); + } else { + if (UNLIKELY(_offset + 4 > _data->size)) { + return Status::IOError("Can't read byte array length from plain decoder"); + } + uint32_t length = + decode_fixed32_le(reinterpret_cast<const uint8_t*>(_data->data) + _offset); + _offset += 4; + if (UNLIKELY(_offset + length) > _data->size) { + return Status::IOError("Can't read enough bytes in plain decoder"); + } + doris_column->insert_data(_data->data + _offset, length); + _offset += length; + } + } return Status::OK(); case TypeIndex::Decimal32: return _decode_binary_decimal<Int32>(doris_column, data_type, num_values); @@ -313,17 +354,6 @@ Status ByteArrayPlainDecoder::decode_values(MutableColumnPtr& doris_column, Data getTypeName(data_type->get_type_id())); } -Status BoolPlainDecoder::decode_values(Slice& slice, size_t num_values) { - bool value; - for (int i = 0; i < num_values; ++i) { - if (UNLIKELY(!_decode_value(&value))) { - return Status::IOError("Can't read enough booleans in plain decoder"); - } - slice.data[i] = value ? 1 : 0; - } - return Status::OK(); -} - Status BoolPlainDecoder::skip_values(size_t num_values) { int skip_cached = std::min(num_unpacked_values_ - unpacked_value_idx_, (int)num_values); unpacked_value_idx_ += skip_cached; diff --git a/be/src/vec/exec/format/parquet/parquet_common.h b/be/src/vec/exec/format/parquet/parquet_common.h index 3620721ddd..9504fd32b5 100644 --- a/be/src/vec/exec/format/parquet/parquet_common.h +++ b/be/src/vec/exec/format/parquet/parquet_common.h @@ -25,6 +25,7 @@ #include "schema_desc.h" #include "util/bit_stream_utils.inline.h" #include "util/coding.h" +#include "util/rle_encoding.h" #include "vec/columns/column_array.h" #include "vec/columns/column_nullable.h" #include "vec/columns/column_string.h" @@ -110,10 +111,12 @@ public: virtual Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, size_t num_values) = 0; - virtual Status decode_values(Slice& slice, size_t num_values) = 0; - virtual Status skip_values(size_t num_values) = 0; + virtual Status set_dict(std::unique_ptr<uint8_t[]>& dict, size_t dict_size) { + return Status::NotSupported("set_dict is not supported"); + } + protected: int32_t _type_length; Slice* _data = nullptr; @@ -146,33 +149,25 @@ void Decoder::init_decimal_converter(DataTypePtr& data_type) { } } -class PlainDecoder final : public Decoder { +class FixLengthDecoder final : public Decoder { public: - PlainDecoder(tparquet::Type::type physical_type) : _physical_type(physical_type) {}; - ~PlainDecoder() override = default; + FixLengthDecoder(tparquet::Type::type physical_type) : _physical_type(physical_type) {}; + ~FixLengthDecoder() override = default; Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, size_t num_values) override; - Status decode_values(Slice& slice, size_t num_values) override; - Status skip_values(size_t num_values) override; + Status set_dict(std::unique_ptr<uint8_t[]>& dict, size_t dict_size) override; + + void set_data(Slice* data) override; + protected: Status _decode_short_int(MutableColumnPtr& doris_column, size_t num_values, size_t real_length); template <typename Numeric> - Status _decode_numeric(MutableColumnPtr& doris_column, size_t num_values) { - size_t to_read_bytes = _type_length * num_values; - if (UNLIKELY(_offset + to_read_bytes > _data->size)) { - return Status::IOError("Out-of-bounds access in parquet data decoder"); - } - auto& column_data = static_cast<ColumnVector<Numeric>&>(*doris_column).get_data(); - const auto* raw_data = reinterpret_cast<const Numeric*>(_data->data + _offset); - column_data.insert(raw_data, raw_data + num_values); - _offset += to_read_bytes; - return Status::OK(); - } + Status _decode_numeric(MutableColumnPtr& doris_column, size_t num_values); template <typename CppType, typename ColumnType> Status _decode_date(MutableColumnPtr& doris_column, TypeIndex& logical_type, size_t num_values); @@ -193,16 +188,44 @@ protected: Status _decode_primitive_decimal(MutableColumnPtr& doris_column, DataTypePtr& data_type, size_t num_values); +#define _FIXED_GET_DATA_OFFSET(index) \ + _has_dict ? reinterpret_cast<char*>(_dict.get() + _indexes[index] * _type_length) \ + : _data->data + _offset + +#define _FIXED_SHIFT_DATA_OFFSET() \ + if (!_has_dict) _offset += _type_length + tparquet::Type::type _physical_type; + // For dictionary encoding + bool _has_dict = false; + std::unique_ptr<uint8_t[]> _dict = nullptr; + std::unique_ptr<RleBatchDecoder<uint32_t>> _index_batch_decoder = nullptr; + std::vector<uint32_t> _indexes; }; +template <typename Numeric> +Status FixLengthDecoder::_decode_numeric(MutableColumnPtr& doris_column, size_t num_values) { + if (_has_dict) { + for (int i = 0; i < num_values; ++i) { + char* buf_start = _FIXED_GET_DATA_OFFSET(i); + doris_column->insert_data(buf_start, _type_length); + } + } else { + auto& column_data = static_cast<ColumnVector<Numeric>&>(*doris_column).get_data(); + const auto* raw_data = reinterpret_cast<const Numeric*>(_data->data + _offset); + column_data.insert(raw_data, raw_data + num_values); + _offset += _type_length * num_values; + } + return Status::OK(); +} + template <typename CppType, typename ColumnType> -Status PlainDecoder::_decode_date(MutableColumnPtr& doris_column, TypeIndex& logical_type, - size_t num_values) { +Status FixLengthDecoder::_decode_date(MutableColumnPtr& doris_column, TypeIndex& logical_type, + size_t num_values) { auto& column_data = static_cast<ColumnVector<ColumnType>&>(*doris_column).get_data(); for (int i = 0; i < num_values; ++i) { - int64_t date_value = - static_cast<int64_t>(*reinterpret_cast<int32_t*>(_data->data + _offset)); + char* buf_start = _FIXED_GET_DATA_OFFSET(i); + int64_t date_value = static_cast<int64_t>(*reinterpret_cast<int32_t*>(buf_start)); CppType v; v.from_unixtime(date_value * 24 * 60 * 60, *_decode_params->ctz); // day to seconds if constexpr (std::is_same_v<CppType, VecDateTimeValue>) { @@ -211,17 +234,18 @@ Status PlainDecoder::_decode_date(MutableColumnPtr& doris_column, TypeIndex& log } ColumnType& cast_value = *reinterpret_cast<ColumnType*>(&v); column_data.emplace_back(cast_value); - _offset += _type_length; + _FIXED_SHIFT_DATA_OFFSET(); } return Status::OK(); } template <typename CppType, typename ColumnType> -Status PlainDecoder::_decode_datetime64(MutableColumnPtr& doris_column, TypeIndex& logical_type, - size_t num_values) { +Status FixLengthDecoder::_decode_datetime64(MutableColumnPtr& doris_column, TypeIndex& logical_type, + size_t num_values) { auto& column_data = static_cast<ColumnVector<ColumnType>&>(*doris_column).get_data(); for (int i = 0; i < num_values; i++) { - int64_t& date_value = *reinterpret_cast<int64_t*>(_data->data + _offset); + char* buf_start = _FIXED_GET_DATA_OFFSET(i); + int64_t& date_value = *reinterpret_cast<int64_t*>(buf_start); CppType v; v.from_unixtime(date_value / _decode_params->second_mask, *_decode_params->ctz); if constexpr (std::is_same_v<CppType, DateV2Value<DateTimeV2ValueType>>) { @@ -231,17 +255,18 @@ Status PlainDecoder::_decode_datetime64(MutableColumnPtr& doris_column, TypeInde } ColumnType& cast_value = *reinterpret_cast<ColumnType*>(&v); column_data.emplace_back(cast_value); - _offset += _type_length; + _FIXED_SHIFT_DATA_OFFSET(); } return Status::OK(); } template <typename CppType, typename ColumnType> -Status PlainDecoder::_decode_datetime96(MutableColumnPtr& doris_column, TypeIndex& logical_type, - size_t num_values) { +Status FixLengthDecoder::_decode_datetime96(MutableColumnPtr& doris_column, TypeIndex& logical_type, + size_t num_values) { auto& column_data = static_cast<ColumnVector<ColumnType>&>(*doris_column).get_data(); for (int i = 0; i < num_values; ++i) { - ParquetInt96& datetime96 = *reinterpret_cast<ParquetInt96*>(_data->data + _offset); + char* buf_start = _FIXED_GET_DATA_OFFSET(i); + ParquetInt96& datetime96 = *reinterpret_cast<ParquetInt96*>(buf_start); CppType v; int64_t micros = datetime96.to_timestamp_micros(); v.from_unixtime(micros / 1000000, *_decode_params->ctz); @@ -252,20 +277,20 @@ Status PlainDecoder::_decode_datetime96(MutableColumnPtr& doris_column, TypeInde } ColumnType& cast_value = *reinterpret_cast<ColumnType*>(&v); column_data.emplace_back(cast_value); - _offset += _type_length; + _FIXED_SHIFT_DATA_OFFSET(); } return Status::OK(); } template <typename DecimalPrimitiveType> -Status PlainDecoder::_decode_binary_decimal(MutableColumnPtr& doris_column, DataTypePtr& data_type, - size_t num_values) { +Status FixLengthDecoder::_decode_binary_decimal(MutableColumnPtr& doris_column, + DataTypePtr& data_type, size_t num_values) { init_decimal_converter<DecimalPrimitiveType>(data_type); auto& column_data = static_cast<ColumnDecimal<Decimal<DecimalPrimitiveType>>&>(*doris_column).get_data(); DecimalScaleParams& scale_params = _decode_params->decimal_scale; for (int i = 0; i < num_values; ++i) { - char* buf_start = _data->data + _offset; + char* buf_start = _FIXED_GET_DATA_OFFSET(i); // When Decimal in parquet is stored in byte arrays, binary and fixed, // the unscaled number must be encoded as two's complement using big-endian byte order. Int128 value = buf_start[0] & 0x80 ? -1 : 0; @@ -279,21 +304,22 @@ Status PlainDecoder::_decode_binary_decimal(MutableColumnPtr& doris_column, Data } DecimalPrimitiveType cast_value(value); column_data.emplace_back(*reinterpret_cast<Decimal<DecimalPrimitiveType>*>(&cast_value)); - _offset += _type_length; + _FIXED_SHIFT_DATA_OFFSET(); } return Status::OK(); } template <typename DecimalPrimitiveType, typename DecimalPhysicalType> -Status PlainDecoder::_decode_primitive_decimal(MutableColumnPtr& doris_column, - DataTypePtr& data_type, size_t num_values) { +Status FixLengthDecoder::_decode_primitive_decimal(MutableColumnPtr& doris_column, + DataTypePtr& data_type, size_t num_values) { init_decimal_converter<DecimalPrimitiveType>(data_type); auto& column_data = static_cast<ColumnDecimal<Decimal<DecimalPrimitiveType>>&>(*doris_column).get_data(); DecimalScaleParams& scale_params = _decode_params->decimal_scale; for (int i = 0; i < num_values; ++i) { + char* buf_start = _FIXED_GET_DATA_OFFSET(i); // we should use decimal128 to scale up/down - Int128 value = *reinterpret_cast<DecimalPhysicalType*>(_data->data + _offset); + Int128 value = *reinterpret_cast<DecimalPhysicalType*>(buf_start); if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) { value *= scale_params.scale_factor; } else if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) { @@ -301,44 +327,62 @@ Status PlainDecoder::_decode_primitive_decimal(MutableColumnPtr& doris_column, } DecimalPrimitiveType cast_value(value); column_data.emplace_back(*reinterpret_cast<Decimal<DecimalPrimitiveType>*>(&cast_value)); - _offset += _type_length; + _FIXED_SHIFT_DATA_OFFSET(); } return Status::OK(); } -class ByteArrayPlainDecoder final : public Decoder { +class ByteArrayDecoder final : public Decoder { public: - ByteArrayPlainDecoder() = default; - ~ByteArrayPlainDecoder() override = default; + ByteArrayDecoder() = default; + ~ByteArrayDecoder() override = default; Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, size_t num_values) override; - Status decode_values(Slice& slice, size_t num_values) override; - Status skip_values(size_t num_values) override; + void set_data(Slice* data) override; + + Status set_dict(std::unique_ptr<uint8_t[]>& dict, size_t dict_size) override; + protected: template <typename DecimalPrimitiveType> Status _decode_binary_decimal(MutableColumnPtr& doris_column, DataTypePtr& data_type, size_t num_values); + + // For dictionary encoding + bool _has_dict = false; + std::unique_ptr<uint8_t[]> _dict = nullptr; + std::vector<uint32_t> _dict_offsets; + std::unique_ptr<RleBatchDecoder<uint32_t>> _index_batch_decoder = nullptr; + std::vector<uint32_t> _indexes; }; template <typename DecimalPrimitiveType> -Status ByteArrayPlainDecoder::_decode_binary_decimal(MutableColumnPtr& doris_column, - DataTypePtr& data_type, size_t num_values) { +Status ByteArrayDecoder::_decode_binary_decimal(MutableColumnPtr& doris_column, + DataTypePtr& data_type, size_t num_values) { init_decimal_converter<DecimalPrimitiveType>(data_type); auto& column_data = static_cast<ColumnDecimal<Decimal<DecimalPrimitiveType>>&>(*doris_column).get_data(); DecimalScaleParams& scale_params = _decode_params->decimal_scale; for (int i = 0; i < num_values; ++i) { - if (UNLIKELY(_offset + 4 > _data->size)) { - return Status::IOError("Can't read byte array length from plain decoder"); + char* buf_start; + uint32_t length; + if (_has_dict) { + uint32_t idx = _indexes[i]; + uint32_t idx_cursor = _dict_offsets[idx]; + buf_start = reinterpret_cast<char*>(_dict.get() + idx_cursor); + length = _dict_offsets[idx + 1] - idx_cursor - 4; + } else { + if (UNLIKELY(_offset + 4 > _data->size)) { + return Status::IOError("Can't read byte array length from plain decoder"); + } + length = decode_fixed32_le(reinterpret_cast<const uint8_t*>(_data->data) + _offset); + _offset += 4; + buf_start = _data->data + _offset; + _offset += length; } - uint32_t length = - decode_fixed32_le(reinterpret_cast<const uint8_t*>(_data->data) + _offset); - _offset += 4; - char* buf_start = _data->data + _offset; // When Decimal in parquet is stored in byte arrays, binary and fixed, // the unscaled number must be encoded as two's complement using big-endian byte order. Int128 value = buf_start[0] & 0x80 ? -1 : 0; @@ -351,7 +395,6 @@ Status ByteArrayPlainDecoder::_decode_binary_decimal(MutableColumnPtr& doris_col } DecimalPrimitiveType cast_value(value); column_data.emplace_back(*reinterpret_cast<Decimal<DecimalPrimitiveType>*>(&cast_value)); - _offset += length; } return Status::OK(); } @@ -374,8 +417,6 @@ public: Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, size_t num_values) override; - Status decode_values(Slice& slice, size_t num_values) override; - Status skip_values(size_t num_values) override; protected: diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp index 778e7b1a66..a697fc5038 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp @@ -119,7 +119,43 @@ Status ColumnChunkReader::load_page_data() { Status ColumnChunkReader::_decode_dict_page() { const tparquet::PageHeader& header = *_page_reader->get_page_header(); DCHECK_EQ(tparquet::PageType::DICTIONARY_PAGE, header.type); - // TODO(gaoxin): decode dictionary page + + // Using the PLAIN_DICTIONARY enum value is deprecated in the Parquet 2.0 specification. + // Prefer using RLE_DICTIONARY in a data page and PLAIN in a dictionary page for Parquet 2.0+ files. + // refer: https://github.com/apache/parquet-format/blob/master/Encodings.md + tparquet::Encoding::type dict_encoding = header.dictionary_page_header.encoding; + if (dict_encoding != tparquet::Encoding::PLAIN_DICTIONARY && + dict_encoding != tparquet::Encoding::PLAIN) { + return Status::InternalError("Unsupported dictionary encoding {}", + tparquet::to_string(dict_encoding)); + } + + // Prepare dictionary data + int32_t uncompressed_size = header.uncompressed_page_size; + std::unique_ptr<uint8_t[]> dict_data(new uint8_t[uncompressed_size]); + if (_block_compress_codec != nullptr) { + Slice compressed_data; + RETURN_IF_ERROR(_page_reader->get_page_date(compressed_data)); + Slice dict_slice(dict_data.get(), uncompressed_size); + RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data, &dict_slice)); + } else { + Slice dict_slice; + RETURN_IF_ERROR(_page_reader->get_page_date(dict_slice)); + // The data is stored by BufferedStreamReader, we should copy it out + memcpy(dict_data.get(), dict_slice.data, dict_slice.size); + } + + // Cache page decoder + std::unique_ptr<Decoder> page_decoder; + Decoder::get_decoder(_metadata.type, tparquet::Encoding::RLE_DICTIONARY, page_decoder); + // Set type length + page_decoder->set_type_length(_get_type_length()); + // Initialize the time convert context + page_decoder->init(_field_schema, _ctz); + // Set the dictionary data + RETURN_IF_ERROR(page_decoder->set_dict(dict_data, header.dictionary_page_header.num_values)); + _decoders[static_cast<int>(tparquet::Encoding::RLE_DICTIONARY)] = std::move(page_decoder); + return Status::OK(); } @@ -138,6 +174,16 @@ Status ColumnChunkReader::skip_values(size_t num_values) { return _page_decoder->skip_values(num_values); } +void ColumnChunkReader::insert_null_values(ColumnPtr& doris_column, size_t num_values) { + DCHECK_GE(_remaining_num_values, num_values); + CHECK(doris_column->is_nullable()); + auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>( + (*std::move(doris_column)).mutate().get()); + MutableColumnPtr data_column = nullable_column->get_nested_column_ptr(); + data_column->insert_default(); + _remaining_num_values -= num_values; +} + size_t ColumnChunkReader::get_rep_levels(level_t* levels, size_t n) { DCHECK_GT(_max_rep_level, 0); return _rep_level_decoder.get_levels(levels, n); @@ -166,14 +212,6 @@ Status ColumnChunkReader::decode_values(MutableColumnPtr& doris_column, DataType return _page_decoder->decode_values(doris_column, data_type, num_values); } -Status ColumnChunkReader::decode_values(Slice& slice, size_t num_values) { - if (UNLIKELY(_remaining_num_values < num_values)) { - return Status::IOError("Decode too many values in current page"); - } - _remaining_num_values -= num_values; - return _page_decoder->decode_values(slice, num_values); -} - int32_t ColumnChunkReader::_get_type_length() { switch (_field_schema->physical_type) { case tparquet::Type::INT32: diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h index bc3fcedbe1..79fdc204dc 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h @@ -87,7 +87,7 @@ public: uint32_t remaining_num_values() const { return _remaining_num_values; }; // null values are generated from definition levels // the caller should maintain the consistency after analyzing null values from definition levels. - void dec_num_values(uint32_t dec_num) { _remaining_num_values -= dec_num; }; + void insert_null_values(ColumnPtr& doris_column, size_t num_values); // Get the raw data of current page. Slice& get_page_data() { return _page_data; } @@ -99,8 +99,6 @@ public: // Decode values in current page into doris column. Status decode_values(ColumnPtr& doris_column, DataTypePtr& data_type, size_t num_values); Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, size_t num_values); - // For test, Decode values in current page into slice. - Status decode_values(Slice& slice, size_t num_values); // Get the repetition level decoder of current page. LevelDecoder& rep_level_decoder() { return _rep_level_decoder; } diff --git a/be/test/exec/test_data/parquet_scanner/dict-decoder.parquet b/be/test/exec/test_data/parquet_scanner/dict-decoder.parquet new file mode 100644 index 0000000000..268d1d7135 Binary files /dev/null and b/be/test/exec/test_data/parquet_scanner/dict-decoder.parquet differ diff --git a/be/test/exec/test_data/parquet_scanner/dict-decoder.txt b/be/test/exec/test_data/parquet_scanner/dict-decoder.txt new file mode 100644 index 0000000000..ba0a3fba5a --- /dev/null +++ b/be/test/exec/test_data/parquet_scanner/dict-decoder.txt @@ -0,0 +1,16 @@ ++---------------------------+-----------------------------+------------------------+---------------------------+----------------------------+----------------------------+-----------------------------+----------------------------+----------------------------+---------------------------------+-------------------------------------+--------------------------+-----------------------------+------------------------+-----------------------------+--------------------------------------+ +|tinyint_col(Nullable(Int8))|smallint_col(Nullable(Int16))|int_col(Nullable(Int32))|bigint_col(Nullable(Int64))|boolean_col(Nullable(UInt8))|float_col(Nullable(Float32))|double_col(Nullable(Float64))|string_col(Nullable(String))|binary_col(Nullable(String))|timestamp_col(Nullable(DateTime))|decimal_col(Nullable(Decimal(27, 9)))|char_col(Nullable(String))|varchar_col(Nullable(String))|date_col(Nullable(Date))|date_v2_col(Nullable(DateV2))|timestamp_v2_col(Nullable(DateTimeV2))| ++---------------------------+-----------------------------+------------------------+---------------------------+----------------------------+----------------------------+-----------------------------+----------------------------+----------------------------+---------------------------------+-------------------------------------+--------------------------+-----------------------------+------------------------+-----------------------------+--------------------------------------+ +| -1| -1| -1| -1| 0| -1.140000| -1.140000| s-row0| b-row0| 2022-08-01 07:23:17| -1.140000000| c-row0| vc-row0| 2022-08-01| 2022-08-01| 2022-08-01 07:23:17| +| -1| -1| -1| -1| 0| -1.140000| -1.140000| s-row0| b-row0| 2022-08-01 07:23:17| -1.140000000| c-row0| vc-row0| 2022-08-01| 2022-08-01| 2022-08-01 07:23:17| +| -1| -1| -1| -1| 0| -1.140000| -1.140000| s-row0| b-row0| 2022-08-01 07:23:17| -1.140000000| c-row0| vc-row0| 2022-08-01| 2022-08-01| 2022-08-01 07:23:17| +| -1| -1| -1| -1| 0| -1.140000| -1.140000| s-row0| b-row0| 2022-08-01 07:23:17| -1.140000000| c-row0| vc-row0| 2022-08-01| 2022-08-01| 2022-08-01 07:23:17| +| -1| -1| -1| -1| 0| -1.140000| -1.140000| s-row0| b-row0| 2022-08-01 07:23:17| -1.140000000| c-row0| vc-row0| 2022-08-01| 2022-08-01| 2022-08-01 07:23:17| +| -1| -1| -1| -1| 0| -1.140000| -1.140000| s-row0| b-row0| 2022-08-01 07:23:17| -1.140000000| c-row0| vc-row0| 2022-08-01| 2022-08-01| 2022-08-01 07:23:17| +| -1| -1| -1| -1| 0| -1.140000| -1.140000| s-row0| b-row0| 2022-08-01 07:23:17| -1.140000000| c-row0| vc-row0| 2022-08-01| 2022-08-01| 2022-08-01 07:23:17| +| -1| -1| -1| -1| 0| -1.140000| -1.140000| s-row0| b-row0| 2022-08-01 07:23:17| -1.140000000| c-row0| vc-row0| 2022-08-01| 2022-08-01| 2022-08-01 07:23:17| +| -1| -1| -1| -1| 0| -1.140000| -1.140000| s-row0| b-row0| 2022-08-01 07:23:17| -1.140000000| c-row0| vc-row0| 2022-08-01| 2022-08-01| 2022-08-01 07:23:17| +| -1| -1| -1| -1| 0| -1.140000| -1.140000| s-row0| b-row0| 2022-08-01 07:23:17| -1.140000000| c-row0| vc-row0| 2022-08-01| 2022-08-01| 2022-08-01 07:23:17| +| -1| -1| -1| -1| 0| -1.140000| -1.140000| s-row0| b-row0| 2022-08-01 07:23:17| -1.140000000| c-row0| vc-row0| 2022-08-01| 2022-08-01| 2022-08-01 07:23:17| +| -1| -1| -1| -1| 0| -1.140000| -1.140000| s-row0| b-row0| 2022-08-01 07:23:17| -1.140000000| c-row0| vc-row0| 2022-08-01| 2022-08-01| 2022-08-01 07:23:17| ++---------------------------+-----------------------------+------------------------+---------------------------+----------------------------+----------------------------+-----------------------------+----------------------------+----------------------------+---------------------------------+-------------------------------------+--------------------------+-----------------------------+------------------------+-----------------------------+--------------------------------------+ diff --git a/be/test/exec/test_data/parquet_scanner/type-decoder.txt b/be/test/exec/test_data/parquet_scanner/type-decoder.txt new file mode 100644 index 0000000000..afe2b6e573 --- /dev/null +++ b/be/test/exec/test_data/parquet_scanner/type-decoder.txt @@ -0,0 +1,14 @@ ++---------------------------+-----------------------------+------------------------+---------------------------+----------------------------+----------------------------+-----------------------------+----------------------------+----------------------------+---------------------------------+-------------------------------------+--------------------------+-----------------------------+------------------------+-----------------------------+--------------------------------------+ +|tinyint_col(Nullable(Int8))|smallint_col(Nullable(Int16))|int_col(Nullable(Int32))|bigint_col(Nullable(Int64))|boolean_col(Nullable(UInt8))|float_col(Nullable(Float32))|double_col(Nullable(Float64))|string_col(Nullable(String))|binary_col(Nullable(String))|timestamp_col(Nullable(DateTime))|decimal_col(Nullable(Decimal(27, 9)))|char_col(Nullable(String))|varchar_col(Nullable(String))|date_col(Nullable(Date))|date_v2_col(Nullable(DateV2))|timestamp_v2_col(Nullable(DateTimeV2))| ++---------------------------+-----------------------------+------------------------+---------------------------+----------------------------+----------------------------+-----------------------------+----------------------------+----------------------------+---------------------------------+-------------------------------------+--------------------------+-----------------------------+------------------------+-----------------------------+--------------------------------------+ +| -1| -1| -1| -1| 0| -1.140000| -1.140000| s-row0| b-row0| 2022-08-01 07:23:17| -1.140000000| c-row0| vc-row0| 2022-08-01| 2022-08-01| 2022-08-01 07:23:17| +| 2| 2| 2| 2| 1| 2.140000| 2.140000| NULL| b-row1| 2022-08-02 07:23:18| 2.140000000| c-row1| vc-row1| 2022-08-02| 2022-08-02| 2022-08-02 07:23:18| +| -3| -3| -3| -3| 0| -3.140000| -3.140000| s-row2| b-row2| 2022-08-03 07:23:19| -3.140000000| c-row2| vc-row2| 2022-08-03| 2022-08-03| 2022-08-03 07:23:19| +| 4| 4| 4| 4| 1| 4.140000| 4.140000| NULL| b-row3| 2022-08-04 07:24:17| 4.140000000| c-row3| vc-row3| 2022-08-04| 2022-08-04| 2022-08-04 07:24:17| +| -5| -5| -5| -5| 0| -5.140000| -5.140000| s-row4| b-row4| 2022-08-05 07:25:17| -5.140000000| c-row4| vc-row4| 2022-08-05| 2022-08-05| 2022-08-05 07:25:17| +| 6| 6| 6| 6| 0| 6.140000| 6.140000| s-row5| b-row5| 2022-08-06 07:26:17| 6.140000000| c-row5| vc-row5| 2022-08-06| 2022-08-06| 2022-08-06 07:26:17| +| -7| -7| -7| -7| 1| -7.140000| -7.140000| s-row6| b-row6| 2022-08-07 07:27:17| -7.140000000| c-row6| vc-row6| 2022-08-07| 2022-08-07| 2022-08-07 07:27:17| +| 8| 8| 8| 8| 0| 8.140000| 8.140000| NULL| b-row7| 2022-08-08 07:28:17| 8.140000000| c-row7| vc-row7| 2022-08-08| 2022-08-08| 2022-08-08 07:28:17| +| -9| -9| -9| -9| 0| -9.140000| -9.140000| s-row8| b-row8| 2022-08-09 07:29:17| -9.140000000| c-row8| vc-row8| 2022-08-09| 2022-08-09| 2022-08-09 07:29:17| +| 10| 10| 10| 10| 0| 10.140000| 10.140000| s-row9| b-row9| 2022-08-10 07:21:17| 10.140000000| c-row9| vc-row9| 2022-08-10| 2022-08-10| 2022-08-10 07:21:17| ++---------------------------+-----------------------------+------------------------+---------------------------+----------------------------+----------------------------+-----------------------------+----------------------------+----------------------------+---------------------------------+-------------------------------------+--------------------------+-----------------------------+------------------------+-----------------------------+--------------------------------------+ diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp b/be/test/vec/exec/parquet/parquet_thrift_test.cpp index c0f62067d3..75cc087d12 100644 --- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp +++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp @@ -131,9 +131,25 @@ TEST_F(ParquetThriftReaderTest, complex_nested_file) { ASSERT_EQ(schemaDescriptor.get_column_index("mark"), 4); } +static int fill_nullable_column(ColumnPtr& doris_column, level_t* definitions, size_t num_values) { + CHECK(doris_column->is_nullable()); + auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>( + (*std::move(doris_column)).mutate().get()); + NullMap& map_data = nullable_column->get_null_map_data(); + int null_cnt = 0; + for (int i = 0; i < num_values; ++i) { + bool nullable = definitions[i] == 0; + if (nullable) { + null_cnt++; + } + map_data.emplace_back(nullable); + } + return null_cnt; +} + static Status get_column_values(FileReader* file_reader, tparquet::ColumnChunk* column_chunk, FieldSchema* field_schema, ColumnPtr& doris_column, - DataTypePtr& data_type) { + DataTypePtr& data_type, level_t* definitions) { tparquet::ColumnMetaData chunk_meta = column_chunk->meta_data; size_t start_offset = chunk_meta.__isset.dictionary_page_offset ? chunk_meta.dictionary_page_offset @@ -150,8 +166,46 @@ static Status get_column_values(FileReader* file_reader, tparquet::ColumnChunk* chunk_reader.next_page(); // load page data into underlying container chunk_reader.load_page_data(); + int rows = chunk_reader.remaining_num_values(); + // definition levels + if (field_schema->definition_level == 0) { // required field + std::fill(definitions, definitions + rows, 1); + } else { + chunk_reader.get_def_levels(definitions, rows); + } + // fill nullable values + fill_nullable_column(doris_column, definitions, rows); // decode page data - return chunk_reader.decode_values(doris_column, data_type, chunk_reader.remaining_num_values()); + if (field_schema->definition_level == 0) { + // required column + return chunk_reader.decode_values(doris_column, data_type, rows); + } else { + // column with null values + level_t level_type = definitions[0]; + int num_values = 1; + for (int i = 1; i < rows; ++i) { + if (definitions[i] != level_type) { + if (level_type == 0) { + // null values + chunk_reader.insert_null_values(doris_column, num_values); + } else { + RETURN_IF_ERROR( + chunk_reader.decode_values(doris_column, data_type, num_values)); + } + level_type = definitions[i]; + num_values = 1; + } else { + num_values++; + } + } + if (level_type == 0) { + // null values + chunk_reader.insert_null_values(doris_column, num_values); + } else { + RETURN_IF_ERROR(chunk_reader.decode_values(doris_column, data_type, num_values)); + } + return Status::OK(); + } } static void create_block(std::unique_ptr<vectorized::Block>& block) { @@ -192,10 +246,11 @@ static void create_block(std::unique_ptr<vectorized::Block>& block) { } } -TEST_F(ParquetThriftReaderTest, type_decoder) { +static void read_parquet_data_and_check(const std::string& parquet_file, + const std::string& result_file, int rows) { /* - * type-decoder.parquet is the part of following table: - * create table `type_decoder`( + * table schema in parquet file: + * create table `decoder`( * `tinyint_col` tinyint, // 0 * `smallint_col` smallint, // 1 * `int_col` int, // 2 @@ -213,20 +268,7 @@ TEST_F(ParquetThriftReaderTest, type_decoder) { * `list_string` array<string>) // 14 */ - LocalFileReader reader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", 0); - /* - * Data in type-decoder.parquet: - * -1 -1 -1 -1 false -1.14 -1.14 s-row0 b-row0 2022-08-01 07:23:17 -1.14 c-row0 vc-row0 2022-08-01 ["as-0","as-1"] - * 2 2 2 2 true 2.14 2.14 NULL b-row1 2022-08-02 07:23:18 2.14 c-row1 vc-row1 2022-08-02 [null,"as-3"] - * -3 -3 -3 -3 false -3.14 -3.14 s-row2 b-row2 2022-08-03 07:23:19 -3.14 c-row2 vc-row2 2022-08-03 [] - * 4 4 4 4 true 4.14 4.14 NULL b-row3 2022-08-04 07:24:17 4.14 c-row3 vc-row3 2022-08-04 ["as-4"] - * -5 -5 -5 -5 false -5.14 -5.14 s-row4 b-row4 2022-08-05 07:25:17 -5.14 c-row4 vc-row4 2022-08-05 ["as-5",null] - * 6 6 6 6 false 6.14 6.14 s-row5 b-row5 2022-08-06 07:26:17 6.14 c-row5 vc-row5 2022-08-06 [null,null] - * -7 -7 -7 -7 true -7.14 -7.14 s-row6 b-row6 2022-08-07 07:27:17 -7.14 c-row6 vc-row6 2022-08-07 ["as-6","as-7"] - * 8 8 8 8 false 8.14 8.14 NULL b-row7 2022-08-08 07:28:17 8.14 c-row7 vc-row7 2022-08-08 ["as-0","as-8"] - * -9 -9 -9 -9 false -9.14 -9.14 s-row8 b-row8 2022-08-09 07:29:17 -9.14 c-row8 vc-row8 2022-08-09 ["as-9","as-10"] - * 10 10 10 10 false 10.14 10.14 s-row9 b-row9 2022-08-10 07:21:17 10.14 c-row9 vc-row9 2022-08-10 ["as-11","as-12"] - */ + LocalFileReader reader(parquet_file, 0); auto st = reader.open(); EXPECT_TRUE(st.ok()); @@ -237,194 +279,15 @@ TEST_F(ParquetThriftReaderTest, type_decoder) { tparquet::FileMetaData t_metadata = metaData->to_thrift_metadata(); FieldDescriptor schema_descriptor; schema_descriptor.parse_from_thrift(t_metadata.schema); - int rows = 10; + level_t defs[rows]; - // the physical_type of tinyint_col, smallint_col and int_col are all INT32 - // they are distinguished by converted_type(in FieldSchema.parquet_schema.converted_type) - { - auto& column_name_with_type = block->get_by_position(0); - auto& data_column = column_name_with_type.column; - auto& data_type = column_name_with_type.type; - get_column_values(&reader, &t_metadata.row_groups[0].columns[0], - const_cast<FieldSchema*>(schema_descriptor.get_column(0)), data_column, - data_type); - int int_sum = 0; - for (int i = 0; i < rows; ++i) { - int_sum += (int8_t)data_column->get64(i); - } - ASSERT_EQ(int_sum, 5); - } - { - auto& column_name_with_type = block->get_by_position(1); - auto& data_column = column_name_with_type.column; - auto& data_type = column_name_with_type.type; - get_column_values(&reader, &t_metadata.row_groups[0].columns[1], - const_cast<FieldSchema*>(schema_descriptor.get_column(1)), data_column, - data_type); - int int_sum = 0; - for (int i = 0; i < rows; ++i) { - int_sum += (int16_t)data_column->get64(i); - } - ASSERT_EQ(int_sum, 5); - } - { - auto& column_name_with_type = block->get_by_position(2); - auto& data_column = column_name_with_type.column; - auto& data_type = column_name_with_type.type; - get_column_values(&reader, &t_metadata.row_groups[0].columns[2], - const_cast<FieldSchema*>(schema_descriptor.get_column(2)), data_column, - data_type); - int int_sum = 0; - for (int i = 0; i < rows; ++i) { - int_sum += (int32_t)data_column->get64(i); - } - ASSERT_EQ(int_sum, 5); - } - { - auto& column_name_with_type = block->get_by_position(3); - auto& data_column = column_name_with_type.column; - auto& data_type = column_name_with_type.type; - get_column_values(&reader, &t_metadata.row_groups[0].columns[3], - const_cast<FieldSchema*>(schema_descriptor.get_column(3)), data_column, - data_type); - int64_t int_sum = 0; - for (int i = 0; i < rows; ++i) { - int_sum += (int64_t)data_column->get64(i); - } - ASSERT_EQ(int_sum, 5); - } - // `boolean_col` boolean, // 4 - { - auto& column_name_with_type = block->get_by_position(4); + for (int c = 0; c < 14; ++c) { + auto& column_name_with_type = block->get_by_position(c); auto& data_column = column_name_with_type.column; auto& data_type = column_name_with_type.type; - get_column_values(&reader, &t_metadata.row_groups[0].columns[4], - const_cast<FieldSchema*>(schema_descriptor.get_column(4)), data_column, - data_type); - ASSERT_FALSE(static_cast<bool>(data_column->get64(0))); - ASSERT_TRUE(static_cast<bool>(data_column->get64(1))); - ASSERT_FALSE(static_cast<bool>(data_column->get64(2))); - ASSERT_TRUE(static_cast<bool>(data_column->get64(3))); - ASSERT_FALSE(static_cast<bool>(data_column->get64(4))); - ASSERT_FALSE(static_cast<bool>(data_column->get64(5))); - ASSERT_TRUE(static_cast<bool>(data_column->get64(6))); - ASSERT_FALSE(static_cast<bool>(data_column->get64(7))); - ASSERT_FALSE(static_cast<bool>(data_column->get64(8))); - ASSERT_FALSE(static_cast<bool>(data_column->get64(9))); - } - // `double_col` double, // 6 - { - auto& column_name_with_type = block->get_by_position(6); - auto& data_column = column_name_with_type.column; - auto& data_type = column_name_with_type.type; - get_column_values(&reader, &t_metadata.row_groups[0].columns[6], - const_cast<FieldSchema*>(schema_descriptor.get_column(6)), data_column, - data_type); - auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>( - (*std::move(data_column)).mutate().get()); - MutableColumnPtr nested_column = nullable_column->get_nested_column_ptr(); - ASSERT_EQ(nested_column->get_float64(0), -1.14); - ASSERT_EQ(nested_column->get_float64(1), 2.14); - ASSERT_EQ(nested_column->get_float64(2), -3.14); - ASSERT_EQ(nested_column->get_float64(3), 4.14); - } - // `string_col` string, // 7 - { - auto& column_name_with_type = block->get_by_position(7); - auto& data_column = column_name_with_type.column; - auto& data_type = column_name_with_type.type; - tparquet::ColumnChunk column_chunk = t_metadata.row_groups[0].columns[7]; - tparquet::ColumnMetaData chunk_meta = column_chunk.meta_data; - size_t start_offset = chunk_meta.__isset.dictionary_page_offset - ? chunk_meta.dictionary_page_offset - : chunk_meta.data_page_offset; - size_t chunk_size = chunk_meta.total_compressed_size; - BufferedFileStreamReader stream_reader(&reader, start_offset, chunk_size); - cctz::time_zone ctz; - TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz); - ColumnChunkReader chunk_reader(&stream_reader, &column_chunk, - const_cast<FieldSchema*>(schema_descriptor.get_column(7)), - &ctz); - // initialize chunk reader - chunk_reader.init(); - // seek to next page header - chunk_reader.next_page(); - // load page data into underlying container - chunk_reader.load_page_data(); - - level_t defs[rows]; - // Analyze null string - chunk_reader.get_def_levels(defs, rows); - ASSERT_EQ(defs[1], 0); - ASSERT_EQ(defs[3], 0); - ASSERT_EQ(defs[7], 0); - - chunk_reader.decode_values(data_column, data_type, 7); - auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>( - (*std::move(data_column)).mutate().get()); - MutableColumnPtr nested_column = nullable_column->get_nested_column_ptr(); - auto row0 = nested_column->get_data_at(0).data; - auto row2 = nested_column->get_data_at(1).data; - ASSERT_STREQ("s-row0", row0); - ASSERT_STREQ("s-row2", row2); - } - // `timestamp_col` timestamp, // 9, DATETIME - { - auto& column_name_with_type = block->get_by_position(9); - auto& data_column = column_name_with_type.column; - auto& data_type = column_name_with_type.type; - get_column_values(&reader, &t_metadata.row_groups[0].columns[9], - const_cast<FieldSchema*>(schema_descriptor.get_column(9)), data_column, - data_type); - auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>( - (*std::move(data_column)).mutate().get()); - MutableColumnPtr nested_column = nullable_column->get_nested_column_ptr(); - int64_t date_value = (int64_t)nested_column->get64(0); - VecDateTimeInt64Union conv = {.i64 = date_value}; - auto dt = conv.dt; - ASSERT_EQ(dt.hour(), 7); - ASSERT_EQ(dt.minute(), 23); - ASSERT_EQ(dt.second(), 17); - } - // `decimal_col` decimal, // 10 - { - auto& column_name_with_type = block->get_by_position(10); - auto& data_column = column_name_with_type.column; - auto& data_type = column_name_with_type.type; - get_column_values(&reader, &t_metadata.row_groups[0].columns[10], - const_cast<FieldSchema*>(schema_descriptor.get_column(10)), data_column, - data_type); - auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>( - (*std::move(data_column)).mutate().get()); - MutableColumnPtr nested_column = nullable_column->get_nested_column_ptr(); - int neg = 1; - for (int i = 0; i < rows; ++i) { - neg *= -1; - auto decimal_field = nested_column->operator[](i) - .get<vectorized::DecimalField<vectorized::Decimal128>>(); - EXPECT_EQ(DecimalV2Value(decimal_field.get_value()), - DecimalV2Value(std::to_string(neg * (1.14 + i)))); - } - } - // `date_col` date, // 13, DATE - { - auto& column_name_with_type = block->get_by_position(13); - auto& data_column = column_name_with_type.column; - auto& data_type = column_name_with_type.type; - get_column_values(&reader, &t_metadata.row_groups[0].columns[13], - const_cast<FieldSchema*>(schema_descriptor.get_column(13)), data_column, - data_type); - auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>( - (*std::move(data_column)).mutate().get()); - MutableColumnPtr nested_column = nullable_column->get_nested_column_ptr(); - for (int i = 0; i < rows; ++i) { - int64_t date_value = (int64_t)nested_column->get64(i); - VecDateTimeInt64Union conv = {.i64 = date_value}; - auto dt = conv.dt; - ASSERT_EQ(dt.year(), 2022); - ASSERT_EQ(dt.month(), 8); - ASSERT_EQ(dt.day(), i + 1); - } + get_column_values(&reader, &t_metadata.row_groups[0].columns[c], + const_cast<FieldSchema*>(schema_descriptor.get_column(c)), data_column, + data_type, defs); } // `date_v2_col` date, // 14 - 13, DATEV2 { @@ -433,18 +296,7 @@ TEST_F(ParquetThriftReaderTest, type_decoder) { auto& data_type = column_name_with_type.type; get_column_values(&reader, &t_metadata.row_groups[0].columns[13], const_cast<FieldSchema*>(schema_descriptor.get_column(13)), data_column, - data_type); - auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>( - (*std::move(data_column)).mutate().get()); - MutableColumnPtr nested_column = nullable_column->get_nested_column_ptr(); - for (int i = 0; i < rows; ++i) { - uint32_t date_value = (uint32_t)nested_column->get64(i); - DateV2UInt32Union conv = {.ui32 = date_value}; - auto dt = conv.dt; - ASSERT_EQ(dt.year(), 2022); - ASSERT_EQ(dt.month(), 8); - ASSERT_EQ(dt.day(), i + 1); - } + data_type, defs); } // `timestamp_v2_col` timestamp, // 15 - 9, DATETIMEV2 { @@ -453,17 +305,28 @@ TEST_F(ParquetThriftReaderTest, type_decoder) { auto& data_type = column_name_with_type.type; get_column_values(&reader, &t_metadata.row_groups[0].columns[9], const_cast<FieldSchema*>(schema_descriptor.get_column(9)), data_column, - data_type); - auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>( - (*std::move(data_column)).mutate().get()); - MutableColumnPtr nested_column = nullable_column->get_nested_column_ptr(); - uint64_t date_value = nested_column->get64(0); - DateTimeV2UInt64Union conv = {.ui64 = date_value}; - auto dt = conv.dt; - ASSERT_EQ(dt.hour(), 7); - ASSERT_EQ(dt.minute(), 23); - ASSERT_EQ(dt.second(), 17); + data_type, defs); } + + LocalFileReader result(result_file, 0); + auto rst = result.open(); + EXPECT_TRUE(rst.ok()); + uint8_t result_buf[result.size() + 1]; + result_buf[result.size()] = '\0'; + int64_t bytes_read; + bool eof; + result.read(result_buf, result.size(), &bytes_read, &eof); + ASSERT_STREQ(block->dump_data(0, rows).c_str(), reinterpret_cast<char*>(result_buf)); +} + +TEST_F(ParquetThriftReaderTest, type_decoder) { + read_parquet_data_and_check("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", + "./be/test/exec/test_data/parquet_scanner/type-decoder.txt", 10); +} + +TEST_F(ParquetThriftReaderTest, dict_decoder) { + read_parquet_data_and_check("./be/test/exec/test_data/parquet_scanner/dict-decoder.parquet", + "./be/test/exec/test_data/parquet_scanner/dict-decoder.txt", 12); } TEST_F(ParquetThriftReaderTest, column_reader) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org