This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 37d1180cca [feature-wip](parquet-reader)decode parquet data (#11536) 37d1180cca is described below commit 37d1180cca26494f65ec6074ab68a4350cf083e6 Author: Ashin Gau <ashin...@users.noreply.github.com> AuthorDate: Mon Aug 8 12:44:06 2022 +0800 [feature-wip](parquet-reader)decode parquet data (#11536) --- be/src/common/config.h | 1 + be/src/io/buffered_reader.cpp | 19 ++- be/src/io/buffered_reader.h | 22 ++-- be/src/util/block_compression.cpp | 30 +++++ be/src/util/block_compression.h | 4 + be/src/util/rle_encoding.h | 56 +++++++++ be/src/vec/CMakeLists.txt | 4 +- be/src/vec/exec/format/parquet/level_decoder.cpp | 76 ++++++++++++ be/src/vec/exec/format/parquet/level_decoder.h | 61 ++++++++++ be/src/vec/exec/format/parquet/parquet_common.cpp | 60 ++++++++++ be/src/vec/exec/format/parquet/parquet_common.h | 108 +++++++++++++++++ be/src/vec/exec/format/parquet/schema_desc.cpp | 11 +- .../parquet/vparquet_column_chunk_reader.cpp | 130 +++++++++++++++++++-- .../format/parquet/vparquet_column_chunk_reader.h | 111 +++++++++++++++++- .../exec/format/parquet/vparquet_page_reader.cpp | 30 +++-- .../vec/exec/format/parquet/vparquet_page_reader.h | 16 ++- .../test_data/parquet_scanner/type-decoder.parquet | Bin 0 -> 338 bytes be/test/vec/exec/parquet/parquet_thrift_test.cpp | 55 +++++++++ 18 files changed, 734 insertions(+), 60 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 258000e540..59e56bcde3 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -792,6 +792,7 @@ CONF_Int32(object_pool_buffer_size, "100"); // ParquetReaderWrap prefetch buffer size CONF_Int32(parquet_reader_max_buffer_size, "50"); CONF_Bool(parquet_predicate_push_down, "true"); +CONF_Int32(parquet_header_max_size, "8388608"); // When the rows number reached this limit, will check the filter rate the of bloomfilter // if it is lower than a specific threshold, the predicate will be disabled. diff --git a/be/src/io/buffered_reader.cpp b/be/src/io/buffered_reader.cpp index 8e2446b9e1..ca40979321 100644 --- a/be/src/io/buffered_reader.cpp +++ b/be/src/io/buffered_reader.cpp @@ -185,10 +185,11 @@ bool BufferedReader::closed() { return _reader->closed(); } -BufferedFileStreamReader::BufferedFileStreamReader(FileReader* file, int64_t offset, int64_t length) +BufferedFileStreamReader::BufferedFileStreamReader(FileReader* file, uint64_t offset, + uint64_t length) : _file(file), _file_start_offset(offset), _file_end_offset(offset + length) {} -Status BufferedFileStreamReader::seek(int64_t position) { +Status BufferedFileStreamReader::seek(uint64_t position) { if (_file_position != position) { RETURN_IF_ERROR(_file->seek(position)); _file_position = position; @@ -196,8 +197,8 @@ Status BufferedFileStreamReader::seek(int64_t position) { return Status::OK(); } -Status BufferedFileStreamReader::read_bytes(const uint8_t** buf, int64_t offset, - int64_t* bytes_to_read) { +Status BufferedFileStreamReader::read_bytes(const uint8_t** buf, uint64_t offset, + size_t* bytes_to_read) { if (offset < _file_start_offset) { return Status::IOError("Out-of-bounds Access"); } @@ -230,19 +231,15 @@ Status BufferedFileStreamReader::read_bytes(const uint8_t** buf, int64_t offset, RETURN_IF_ERROR(seek(_buf_end_offset)); bool eof = false; int64_t buf_remaining = _buf_end_offset - _buf_start_offset; - RETURN_IF_ERROR( - _file->read(_buf.get() + buf_remaining, _buf_size - buf_remaining, &to_read, &eof)); + RETURN_IF_ERROR(_file->read(_buf.get() + buf_remaining, to_read, &to_read, &eof)); *bytes_to_read = buf_remaining + to_read; _buf_end_offset += to_read; *buf = _buf.get(); return Status::OK(); } -Status BufferedFileStreamReader::read_bytes(Slice& slice, int64_t offset) { - int64_t bytes_to_read = slice.size; - Status st = read_bytes((const uint8_t**)&slice.data, offset, &bytes_to_read); - slice.size = bytes_to_read; - return st; +Status BufferedFileStreamReader::read_bytes(Slice& slice, uint64_t offset) { + return read_bytes((const uint8_t**)&slice.data, offset, &slice.size); } } // namespace doris diff --git a/be/src/io/buffered_reader.h b/be/src/io/buffered_reader.h index d4a5f37927..2cfcaaa413 100644 --- a/be/src/io/buffered_reader.h +++ b/be/src/io/buffered_reader.h @@ -93,34 +93,34 @@ public: * @param offset start offset ot read in stream * @param bytes_to_read bytes to read */ - virtual Status read_bytes(const uint8_t** buf, int64_t offset, int64_t* bytes_to_read) = 0; + virtual Status read_bytes(const uint8_t** buf, uint64_t offset, size_t* bytes_to_read) = 0; /** * Save the data address to slice.data, and the slice.size is the bytes to read. */ - virtual Status read_bytes(Slice& slice, int64_t offset) = 0; + virtual Status read_bytes(Slice& slice, uint64_t offset) = 0; virtual ~BufferedStreamReader() = default; }; class BufferedFileStreamReader : public BufferedStreamReader { public: - BufferedFileStreamReader(FileReader* file, int64_t offset, int64_t length); + BufferedFileStreamReader(FileReader* file, uint64_t offset, uint64_t length); ~BufferedFileStreamReader() override = default; - Status read_bytes(const uint8_t** buf, int64_t stream_offset, int64_t* bytes_to_read) override; - Status read_bytes(Slice& slice, int64_t stream_offset) override; + Status read_bytes(const uint8_t** buf, uint64_t offset, size_t* bytes_to_read) override; + Status read_bytes(Slice& slice, uint64_t offset) override; private: std::unique_ptr<uint8_t[]> _buf; FileReader* _file; - int64_t _file_start_offset; - int64_t _file_end_offset; + uint64_t _file_start_offset; + uint64_t _file_end_offset; int64_t _file_position = -1; - int64_t _buf_start_offset = 0; - int64_t _buf_end_offset = 0; - int64_t _buf_size = 0; + uint64_t _buf_start_offset = 0; + uint64_t _buf_end_offset = 0; + size_t _buf_size = 0; - Status seek(int64_t position); + Status seek(uint64_t position); }; } // namespace doris diff --git a/be/src/util/block_compression.cpp b/be/src/util/block_compression.cpp index 4279080b21..1dafd20f54 100644 --- a/be/src/util/block_compression.cpp +++ b/be/src/util/block_compression.cpp @@ -550,4 +550,34 @@ Status get_block_compression_codec(segment_v2::CompressionTypePB type, return st; } +Status get_block_compression_codec(tparquet::CompressionCodec::type parquet_codec, + std::unique_ptr<BlockCompressionCodec>& codec) { + BlockCompressionCodec* ptr = nullptr; + switch (parquet_codec) { + case tparquet::CompressionCodec::UNCOMPRESSED: + codec.reset(nullptr); + return Status::OK(); + case tparquet::CompressionCodec::SNAPPY: + ptr = new SnappyBlockCompression(); + break; + case tparquet::CompressionCodec::LZ4: + ptr = new Lz4BlockCompression(); + break; + case tparquet::CompressionCodec::ZSTD: + ptr = new ZstdBlockCompression(); + break; + default: + return Status::NotFound("unknown compression type({})", parquet_codec); + } + + Status st = ptr->init(); + if (st.ok()) { + codec.reset(ptr); + } else { + delete ptr; + } + + return st; +} + } // namespace doris diff --git a/be/src/util/block_compression.h b/be/src/util/block_compression.h index ddda23a3ba..9de8eb16ff 100644 --- a/be/src/util/block_compression.h +++ b/be/src/util/block_compression.h @@ -21,6 +21,7 @@ #include <vector> #include "common/status.h" +#include "gen_cpp/parquet_types.h" #include "gen_cpp/segment_v2.pb.h" #include "util/slice.h" @@ -70,4 +71,7 @@ public: Status get_block_compression_codec(segment_v2::CompressionTypePB type, std::unique_ptr<BlockCompressionCodec>& codec); +Status get_block_compression_codec(tparquet::CompressionCodec::type parquet_codec, + std::unique_ptr<BlockCompressionCodec>& codec); + } // namespace doris diff --git a/be/src/util/rle_encoding.h b/be/src/util/rle_encoding.h index 3340669b78..08a7a23a4d 100644 --- a/be/src/util/rle_encoding.h +++ b/be/src/util/rle_encoding.h @@ -110,6 +110,14 @@ public: // GetNextRun will return more from the same run. size_t GetNextRun(T* val, size_t max_run); + size_t get_values(T* values, size_t num_values); + + // Get the count of current repeated value + size_t repeated_count(); + + // Get current repeated value, make sure that count equals repeated_count() + T get_repeated_value(size_t count); + private: bool ReadHeader(); @@ -334,6 +342,54 @@ inline size_t RleDecoder<T>::GetNextRun(T* val, size_t max_run) { return ret; } +template <typename T> +inline size_t RleDecoder<T>::get_values(T* values, size_t num_values) { + size_t read_num = 0; + while (read_num < num_values) { + size_t read_this_time = num_values - read_num; + + if (LIKELY(repeat_count_ > 0)) { + read_this_time = std::min((size_t)repeat_count_, read_this_time); + std::fill(values, values + read_this_time, current_value_); + values += read_this_time; + repeat_count_ -= read_this_time; + read_num += read_this_time; + } else if (literal_count_ > 0) { + read_this_time = std::min((size_t)literal_count_, read_this_time); + for (int i = 0; i < read_this_time; ++i) { + bool result = bit_reader_.GetValue(bit_width_, values); + DCHECK(result); + values++; + } + literal_count_ -= read_this_time; + read_num += read_this_time; + } else { + if (!ReadHeader()) { + return read_num; + } + } + } + return read_num; +} + +template <typename T> +inline size_t RleDecoder<T>::repeated_count() { + if (repeat_count_ > 0) { + return repeat_count_; + } + if (literal_count_ == 0) { + ReadHeader(); + } + return repeat_count_; +} + +template <typename T> +inline T RleDecoder<T>::get_repeated_value(size_t count) { + DCHECK_GE(repeat_count_, count); + repeat_count_ -= count; + return current_value_; +} + template <typename T> inline size_t RleDecoder<T>::Skip(size_t to_skip) { DCHECK(bit_reader_.is_initialized()); diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 56fdb70a4b..bc37c49310 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -229,7 +229,9 @@ set(VEC_FILES exec/format/parquet/vparquet_file_metadata.cpp exec/format/parquet/vparquet_page_reader.cpp exec/format/parquet/schema_desc.cpp - exec/format/parquet/vparquet_column_reader.cpp) + exec/format/parquet/vparquet_column_reader.cpp + exec/format/parquet/level_decoder.cpp + exec/format/parquet/parquet_common.cpp) add_library(Vec STATIC ${VEC_FILES} diff --git a/be/src/vec/exec/format/parquet/level_decoder.cpp b/be/src/vec/exec/format/parquet/level_decoder.cpp new file mode 100644 index 0000000000..616da10b8e --- /dev/null +++ b/be/src/vec/exec/format/parquet/level_decoder.cpp @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "level_decoder.h" + +#include "util/bit_util.h" +#include "util/coding.h" + +doris::Status doris::vectorized::LevelDecoder::init(doris::Slice* slice, + tparquet::Encoding::type encoding, + doris::vectorized::level_t max_level, + uint32_t num_levels) { + _encoding = encoding; + _bit_width = BitUtil::log2(max_level + 1); + _max_level = max_level; + _num_levels = num_levels; + switch (encoding) { + case tparquet::Encoding::RLE: { + if (slice->size < 4) { + return Status::Corruption("Wrong parquet level format"); + } + + uint8_t* data = (uint8_t*)slice->data; + uint32_t num_bytes = decode_fixed32_le(data); + if (num_bytes > slice->size - 4) { + return Status::Corruption("Wrong parquet level format"); + } + _rle_decoder = RleDecoder<level_t>(data + 4, num_bytes, _bit_width); + + slice->data += 4 + num_bytes; + slice->size -= 4 + num_bytes; + break; + } + case tparquet::Encoding::BIT_PACKED: { + uint32_t num_bits = num_levels * _bit_width; + uint32_t num_bytes = BitUtil::RoundUpNumBytes(num_bits); + if (num_bytes > slice->size) { + return Status::Corruption("Wrong parquet level format"); + } + _bit_packed_decoder = BitReader((uint8_t*)slice->data, num_bytes); + + slice->data += num_bytes; + slice->size -= num_bytes; + break; + } + default: + return Status::IOError("Unsupported encoding for parquet level"); + } + return Status::OK(); +} + +size_t doris::vectorized::LevelDecoder::get_levels(doris::vectorized::level_t* levels, size_t n) { + if (_encoding == tparquet::Encoding::RLE) { + n = std::min((size_t)_num_levels, n); + auto num_decoded = _rle_decoder.get_values(levels, n); + _num_levels -= num_decoded; + return num_decoded; + } else if (_encoding == tparquet::Encoding::BIT_PACKED) { + // TODO(gaoxin): BIT_PACKED encoding + } + return 0; +} diff --git a/be/src/vec/exec/format/parquet/level_decoder.h b/be/src/vec/exec/format/parquet/level_decoder.h new file mode 100644 index 0000000000..be6c6c6154 --- /dev/null +++ b/be/src/vec/exec/format/parquet/level_decoder.h @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cstdint> + +#include "common/status.h" +#include "gen_cpp/parquet_types.h" +#include "parquet_common.h" +#include "util/bit_stream_utils.h" +#include "util/rle_encoding.h" + +namespace doris::vectorized { + +class LevelDecoder { +public: + LevelDecoder() = default; + ~LevelDecoder() = default; + + Status init(Slice* slice, tparquet::Encoding::type encoding, level_t max_level, + uint32_t num_levels); + + bool has_levels() const { return _num_levels > 0; } + + size_t get_levels(level_t* levels, size_t n); + + size_t next_repeated_count() { + DCHECK_EQ(_encoding, tparquet::Encoding::RLE); + return _rle_decoder.repeated_count(); + } + + level_t get_repeated_value(size_t count) { + DCHECK_EQ(_encoding, tparquet::Encoding::RLE); + return _rle_decoder.get_repeated_value(count); + } + +private: + tparquet::Encoding::type _encoding; + level_t _bit_width = 0; + level_t _max_level = 0; + uint32_t _num_levels = 0; + RleDecoder<level_t> _rle_decoder; + BitReader _bit_packed_decoder; +}; + +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exec/format/parquet/parquet_common.cpp b/be/src/vec/exec/format/parquet/parquet_common.cpp new file mode 100644 index 0000000000..ec0c3ce411 --- /dev/null +++ b/be/src/vec/exec/format/parquet/parquet_common.cpp @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet_common.h" + +namespace doris::vectorized { + +Status Decoder::getDecoder(tparquet::Type::type type, tparquet::Encoding::type encoding, + std::unique_ptr<Decoder>& decoder) { + switch (encoding) { + case tparquet::Encoding::PLAIN: + switch (type) { + case tparquet::Type::INT32: + decoder.reset(new PlainDecoder<Int32>()); + break; + case tparquet::Type::INT64: + decoder.reset(new PlainDecoder<Int64>()); + break; + case tparquet::Type::FLOAT: + decoder.reset(new PlainDecoder<Float32>()); + break; + case tparquet::Type::DOUBLE: + decoder.reset(new PlainDecoder<Float64>()); + break; + default: + return Status::InternalError("Unsupported plain type {} in parquet decoder", + tparquet::to_string(type)); + } + case tparquet::Encoding::RLE_DICTIONARY: + break; + default: + return Status::InternalError("Unsupported encoding {} in parquet decoder", + tparquet::to_string(encoding)); + } + return Status::OK(); +} + +MutableColumnPtr Decoder::getMutableColumnPtr(ColumnPtr& doris_column) { + // src column always be nullable for simple converting + CHECK(doris_column->is_nullable()); + auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>( + (*std::move(doris_column)).mutate().get()); + return nullable_column->get_nested_column_ptr(); +} + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/parquet_common.h b/be/src/vec/exec/format/parquet/parquet_common.h new file mode 100644 index 0000000000..0f0e9f6e3d --- /dev/null +++ b/be/src/vec/exec/format/parquet/parquet_common.h @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cstdint> + +#include "common/status.h" +#include "gen_cpp/parquet_types.h" +#include "vec/columns/column_array.h" +#include "vec/columns/column_nullable.h" + +namespace doris::vectorized { + +using level_t = int16_t; + +class Decoder { +public: + Decoder() = default; + virtual ~Decoder() = default; + + static Status getDecoder(tparquet::Type::type type, tparquet::Encoding::type encoding, + std::unique_ptr<Decoder>& decoder); + + static MutableColumnPtr getMutableColumnPtr(ColumnPtr& doris_column); + + // The type with fix length + void set_type_length(int32_t type_length) { _type_length = type_length; } + + // Set the data to be decoded + void set_data(Slice* data) { + _data = data; + _offset = 0; + } + + // Write the decoded values batch to doris's column + virtual Status decode_values(ColumnPtr& doris_column, size_t num_values) = 0; + + virtual Status decode_values(Slice& slice, size_t num_values) = 0; + + virtual Status skip_values(size_t num_values) = 0; + +protected: + int32_t _type_length; + Slice* _data = nullptr; + uint32_t _offset = 0; +}; + +template <typename T> +class PlainDecoder final : public Decoder { +public: + PlainDecoder() = default; + ~PlainDecoder() override = default; + + Status decode_values(ColumnPtr& doris_column, size_t num_values) override { + size_t to_read_bytes = TYPE_LENGTH * num_values; + if (UNLIKELY(_offset + to_read_bytes > _data->size)) { + return Status::IOError("Out-of-bounds access in parquet data decoder"); + } + auto data_column = getMutableColumnPtr(doris_column); + auto& column_data = static_cast<ColumnVector<T>&>(*data_column).get_data(); + const auto* raw_data = reinterpret_cast<const T*>(_data->data + _offset); + column_data.insert(raw_data, raw_data + num_values); + _offset += to_read_bytes; + return Status::OK(); + } + + Status decode_values(Slice& slice, size_t num_values) override { + size_t to_read_bytes = TYPE_LENGTH * num_values; + if (UNLIKELY(_offset + to_read_bytes > _data->size)) { + return Status::IOError("Out-of-bounds access in parquet data decoder"); + } + if (UNLIKELY(to_read_bytes > slice.size)) { + return Status::IOError( + "Slice does not have enough space to write out the decoding data"); + } + memcpy(slice.data, _data->data + _offset, to_read_bytes); + _offset += to_read_bytes; + return Status::OK(); + } + + Status skip_values(size_t num_values) override { + _offset += TYPE_LENGTH * num_values; + if (UNLIKELY(_offset > _data->size)) { + return Status::IOError("Out-of-bounds access in parquet data decoder"); + } + return Status::OK(); + } + +protected: + enum { TYPE_LENGTH = sizeof(T) }; +}; + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/schema_desc.cpp b/be/src/vec/exec/format/parquet/schema_desc.cpp index 51969220ed..7f6f36f023 100644 --- a/be/src/vec/exec/format/parquet/schema_desc.cpp +++ b/be/src/vec/exec/format/parquet/schema_desc.cpp @@ -17,8 +17,6 @@ #include "schema_desc.h" -#include "gutil/strings/substitute.h" - namespace doris::vectorized { static bool is_group_node(const tparquet::SchemaElement& schema) { @@ -98,15 +96,14 @@ Status FieldDescriptor::parse_from_thrift(const std::vector<tparquet::SchemaElem for (int i = 0; i < root_schema.num_children; ++i) { RETURN_IF_ERROR(parse_node_field(t_schemas, _next_schema_pos, &_fields[i])); if (_name_to_field.find(_fields[i].name) != _name_to_field.end()) { - return Status::InvalidArgument( - strings::Substitute("Duplicated field name: {}", _fields[i].name)); + return Status::InvalidArgument("Duplicated field name: {}", _fields[i].name); } _name_to_field.emplace(_fields[i].name, &_fields[i]); } if (_next_schema_pos != t_schemas.size()) { - return Status::InvalidArgument(strings::Substitute("Remaining {} unparsed schema elements", - t_schemas.size() - _next_schema_pos)); + return Status::InvalidArgument("Remaining {} unparsed schema elements", + t_schemas.size() - _next_schema_pos); } return Status::OK(); @@ -355,7 +352,7 @@ int FieldDescriptor::get_column_index(const std::string& column) const { return -1; } -const FieldSchema* FieldDescriptor::get_column(const string& name) const { +const FieldSchema* FieldDescriptor::get_column(const std::string& name) const { auto it = _name_to_field.find(name); if (it != _name_to_field.end()) { return it->second; diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp index 274882179b..11d6bfaf94 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp @@ -14,27 +14,135 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. + #include "vparquet_column_chunk_reader.h" namespace doris::vectorized { -Status ColumnChunkReader::init() { - return Status(); +ColumnChunkReader::ColumnChunkReader(BufferedStreamReader* reader, + tparquet::ColumnChunk* column_chunk, FieldSchema* fieldSchema) + : _max_rep_level(fieldSchema->repetition_level), + _max_def_level(fieldSchema->definition_level), + _stream_reader(reader), + _metadata(column_chunk->meta_data) {} + +Status ColumnChunkReader::init(size_t type_length) { + size_t start_offset = _metadata.__isset.dictionary_page_offset + ? _metadata.dictionary_page_offset + : _metadata.data_page_offset; + size_t chunk_size = _metadata.total_compressed_size; + _page_reader = std::make_unique<PageReader>(_stream_reader, start_offset, chunk_size); + + if (_metadata.__isset.dictionary_page_offset) { + RETURN_IF_ERROR(_decode_dict_page()); + } + // seek to the first data page + _page_reader->seek_to_page(_metadata.data_page_offset); + + // get the block compression codec + RETURN_IF_ERROR(get_block_compression_codec(_metadata.codec, _block_compress_codec)); + // -1 means unfixed length type + _type_length = type_length; + + return Status::OK(); +} + +Status ColumnChunkReader::next_page() { + RETURN_IF_ERROR(_page_reader->next_page()); + _num_values = _page_reader->get_page_header()->data_page_header.num_values; + return Status::OK(); +} + +Status ColumnChunkReader::load_page_data() { + const auto& header = *_page_reader->get_page_header(); + // int32_t compressed_size = header.compressed_page_size; + int32_t uncompressed_size = header.uncompressed_page_size; + + if (_block_compress_codec != nullptr) { + Slice compressed_data; + RETURN_IF_ERROR(_page_reader->get_page_date(compressed_data)); + // check decompressed buffer size + _reserve_decompress_buf(uncompressed_size); + _page_data = Slice(_decompress_buf.get(), uncompressed_size); + RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data, &_page_data)); + } else { + RETURN_IF_ERROR(_page_reader->get_page_date(_page_data)); + } + + // Initialize repetition level and definition level. Skip when level = 0, which means required field. + if (_max_rep_level > 0) { + RETURN_IF_ERROR(_rep_level_decoder.init(&_page_data, + header.data_page_header.repetition_level_encoding, + _max_rep_level, _num_values)); + } + if (_max_def_level > 0) { + RETURN_IF_ERROR(_def_level_decoder.init(&_page_data, + header.data_page_header.definition_level_encoding, + _max_def_level, _num_values)); + } + + auto encoding = header.data_page_header.encoding; + // change the deprecated encoding to RLE_DICTIONARY + if (encoding == tparquet::Encoding::PLAIN_DICTIONARY) { + encoding = tparquet::Encoding::RLE_DICTIONARY; + } + Decoder::getDecoder(_metadata.type, encoding, _page_decoder); + _page_decoder->set_data(&_page_data); + if (_type_length > 0) { + _page_decoder->set_type_length(_type_length); + } + + return Status::OK(); +} + +Status ColumnChunkReader::_decode_dict_page() { + int64_t dict_offset = _metadata.dictionary_page_offset; + _page_reader->seek_to_page(dict_offset); + _page_reader->next_page(); + const tparquet::PageHeader& header = *_page_reader->get_page_header(); + DCHECK_EQ(tparquet::PageType::DICTIONARY_PAGE, header.type); + // TODO(gaoxin): decode dictionary page + return Status::OK(); +} + +void ColumnChunkReader::_reserve_decompress_buf(size_t size) { + if (size > _decompress_buf_size) { + _decompress_buf_size = BitUtil::next_power_of_two(size); + _decompress_buf.reset(new uint8_t[_decompress_buf_size]); + } +} + +Status ColumnChunkReader::skip_values(size_t num_values) { + if (UNLIKELY(_num_values < num_values)) { + return Status::IOError("Skip too many values in current page"); + } + _num_values -= num_values; + return _page_decoder->skip_values(num_values); } -Status ColumnChunkReader::read_min_max_stat() { - return Status(); +size_t ColumnChunkReader::get_rep_levels(level_t* levels, size_t n) { + DCHECK_GT(_max_rep_level, 0); + return _def_level_decoder.get_levels(levels, n); } -Status ColumnChunkReader::decode_dict_page() { - return Status(); +size_t ColumnChunkReader::get_def_levels(level_t* levels, size_t n) { + DCHECK_GT(_max_def_level, 0); + return _rep_level_decoder.get_levels(levels, n); } -Status ColumnChunkReader::decode_nested_page() { - return Status(); +Status ColumnChunkReader::decode_values(ColumnPtr& doris_column, size_t num_values) { + if (UNLIKELY(_num_values < num_values)) { + return Status::IOError("Decode too many values in current page"); + } + _num_values -= num_values; + return _page_decoder->decode_values(doris_column, num_values); } -Status ColumnChunkReader::read_next_page() { - return Status(); +Status ColumnChunkReader::decode_values(Slice& slice, size_t num_values) { + if (UNLIKELY(_num_values < num_values)) { + return Status::IOError("Decode too many values in current page"); + } + _num_values -= num_values; + return _page_decoder->decode_values(slice, num_values); } -} // namespace doris::vectorized \ No newline at end of file +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h index dac287096a..c3b58be5c0 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h @@ -16,20 +16,119 @@ // under the License. #pragma once -#include <common/status.h> + +#include <cstdint> +#include <memory> +#include <unordered_map> +#include <vector> + +#include "common/status.h" +#include "gen_cpp/parquet_types.h" +#include "io/buffered_reader.h" +#include "level_decoder.h" +#include "parquet_common.h" +#include "schema_desc.h" +#include "util/block_compression.h" +#include "vec/columns/column_array.h" +#include "vec/columns/column_nullable.h" +#include "vparquet_page_reader.h" namespace doris::vectorized { +/** + * Read and decode parquet column data into doris block column. + * <p>Usage:</p> + * // Create chunk reader + * ColumnChunkReader chunk_reader(BufferedStreamReader* reader, + * tparquet::ColumnChunk* column_chunk, + * FieldSchema* fieldSchema); + * // Initialize chunk reader, we can set the type length if the length of column type is fixed. + * // If not set, default value = -1, then the decoder will infer the type length. + * chunk_reader.init(); + * while (chunk_reader.has_next_page()) { + * // Seek to next page header. Only read and parse the page header, not page data. + * chunk_reader.next_page(); + * // Load data to decoder. Load the page data into underlying container. + * // Or, we can call the chunk_reader.skip_page() to skip current page. + * chunk_reader.load_page_data(); + * // Decode values into column or slice. + * // Or, we can call chunk_reader.slip_values(num_values) to skip some values. + * chunk_reader.decode_values(slice, num_values); + * } + */ class ColumnChunkReader { public: - Status init(); - Status read_next_page(); + ColumnChunkReader(BufferedStreamReader* reader, tparquet::ColumnChunk* column_chunk, + FieldSchema* fieldSchema); + ~ColumnChunkReader() = default; + + // Initialize chunk reader, will generate the decoder and codec. + // We can set the type_length if the length of colum type if fixed, + // or not set, the decoder will try to infer the type_length. + Status init(size_t type_length = -1); + + // Whether the chunk reader has a more page to read. + bool has_next_page() { return _page_reader->has_next_page(); } + + // Seek to the specific page, page_header_offset must be the start offset of the page header. + void seek_to_page(int64_t page_header_offset) { + _page_reader->seek_to_page(page_header_offset); + } + + // Seek to next page. Only read and parse the page header. + Status next_page(); + + // Skip current page(will not read and parse) if the page is filtered by predicates. + Status skip_page() { return _page_reader->skip_page(); } + // Skip some values(will not read and parse) in current page if the values are filtered by predicates. + Status skip_values(size_t num_values); - Status read_min_max_stat(); - Status decode_dict_page(); - Status decode_nested_page(); + // Load page data into the underlying container, + // and initialize the repetition and definition level decoder for current page data. + Status load_page_data(); + // The remaining number of values in current page. Decreased when reading or skipping. + uint32_t num_values() const { return _num_values; }; + // Get the raw data of current page. + Slice& get_page_data() { return _page_data; } + + // Get the repetition levels + size_t get_rep_levels(level_t* levels, size_t n); + // Get the definition levels + size_t get_def_levels(level_t* levels, size_t n); + + // Decode values in current page into doris column. + Status decode_values(ColumnPtr& doris_column, size_t num_values); + // For test, Decode values in current page into slice. + Status decode_values(Slice& slice, size_t num_values); + + // Get the repetition level decoder of current page. + LevelDecoder& rep_level_decoder() { return _rep_level_decoder; } + // Get the definition level decoder of current page. + LevelDecoder& def_level_decoder() { return _def_level_decoder; } private: + Status _decode_dict_page(); + void _reserve_decompress_buf(size_t size); + + level_t _max_rep_level; + level_t _max_def_level; + + BufferedStreamReader* _stream_reader; + // tparquet::ColumnChunk* _column_chunk; + tparquet::ColumnMetaData& _metadata; + // FieldSchema* _field_schema; + + std::unique_ptr<PageReader> _page_reader = nullptr; + std::unique_ptr<BlockCompressionCodec> _block_compress_codec = nullptr; + + LevelDecoder _rep_level_decoder; + LevelDecoder _def_level_decoder; + uint32_t _num_values = 0; + Slice _page_data; + std::unique_ptr<uint8_t[]> _decompress_buf; + size_t _decompress_buf_size = 0; + std::unique_ptr<Decoder> _page_decoder = nullptr; + size_t _type_length = -1; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp index 533caa3db0..f554be169e 100644 --- a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp @@ -23,12 +23,12 @@ namespace doris::vectorized { -static constexpr int64_t initPageHeaderSize = 1024; +static constexpr size_t initPageHeaderSize = 1024; -PageReader::PageReader(BufferedStreamReader* reader, int64_t start_offset, int64_t length) - : _reader(reader), _start_offset(start_offset), _end_offset(start_offset + length) {} +PageReader::PageReader(BufferedStreamReader* reader, uint64_t offset, uint64_t length) + : _reader(reader), _start_offset(offset), _end_offset(offset + length) {} -Status PageReader::next_page(Slice& slice) { +Status PageReader::next_page() { if (_offset < _start_offset || _offset >= _end_offset) { return Status::IOError("Out-of-bounds Access"); } @@ -37,8 +37,8 @@ Status PageReader::next_page(Slice& slice) { } const uint8_t* page_header_buf = nullptr; - int64_t max_size = _end_offset - _offset; - int64_t header_size = std::min(initPageHeaderSize, max_size); + size_t max_size = _end_offset - _offset; + size_t header_size = std::min(initPageHeaderSize, max_size); uint32_t real_header_size = 0; while (true) { header_size = std::min(header_size, max_size); @@ -49,7 +49,8 @@ Status PageReader::next_page(Slice& slice) { if (st.ok()) { break; } - if (_offset + header_size >= _end_offset) { + if (_offset + header_size >= _end_offset || + real_header_size > config::parquet_header_max_size) { return Status::IOError("Failed to deserialize parquet page header"); } header_size <<= 2; @@ -57,11 +58,26 @@ Status PageReader::next_page(Slice& slice) { _offset += real_header_size; _next_header_offset = _offset + _cur_page_header.compressed_page_size; + return Status::OK(); +} + +Status PageReader::skip_page() { + if (_offset == _next_header_offset) { + return Status::InternalError("Should call next_page() to generate page header"); + } + _offset = _next_header_offset; + return Status::OK(); +} +Status PageReader::get_page_date(Slice& slice) { + if (_offset == _next_header_offset) { + return Status::InternalError("Should call next_page() to generate page header"); + } slice.size = _cur_page_header.compressed_page_size; RETURN_IF_ERROR(_reader->read_bytes(slice, _offset)); DCHECK_EQ(slice.size, _cur_page_header.compressed_page_size); _offset += slice.size; return Status::OK(); } + } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.h b/be/src/vec/exec/format/parquet/vparquet_page_reader.h index 523098cc44..cf95812ead 100644 --- a/be/src/vec/exec/format/parquet/vparquet_page_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.h @@ -29,15 +29,19 @@ namespace doris::vectorized { class PageReader { public: public: - PageReader(BufferedStreamReader* reader, int64_t start_offset, int64_t length); + PageReader(BufferedStreamReader* reader, uint64_t offset, uint64_t length); ~PageReader() = default; bool has_next_page() const { return _offset < _end_offset; } - Status next_page(Slice& slice); + Status next_page(); + + Status skip_page(); const tparquet::PageHeader* get_page_header() const { return &_cur_page_header; } + Status get_page_date(Slice& slice); + void seek_to_page(int64_t page_header_offset) { _offset = page_header_offset; _next_header_offset = page_header_offset; @@ -47,11 +51,11 @@ private: BufferedStreamReader* _reader; tparquet::PageHeader _cur_page_header; - int64_t _offset = 0; - int64_t _next_header_offset = 0; + uint64_t _offset = 0; + uint64_t _next_header_offset = 0; - int64_t _start_offset = 0; - int64_t _end_offset = 0; + uint64_t _start_offset = 0; + uint64_t _end_offset = 0; }; } // namespace doris::vectorized \ No newline at end of file diff --git a/be/test/exec/test_data/parquet_scanner/type-decoder.parquet b/be/test/exec/test_data/parquet_scanner/type-decoder.parquet new file mode 100644 index 0000000000..e4b5820161 Binary files /dev/null and b/be/test/exec/test_data/parquet_scanner/type-decoder.parquet differ diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp b/be/test/vec/exec/parquet/parquet_thrift_test.cpp index 5100ea32f3..02bcef6261 100644 --- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp +++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp @@ -27,6 +27,7 @@ #include "io/local_file_reader.h" #include "util/runtime_profile.h" #include "vec/exec/format/parquet/parquet_thrift_util.h" +#include "vec/exec/format/parquet/vparquet_column_chunk_reader.h" #include "vec/exec/format/parquet/vparquet_file_metadata.h" namespace doris { @@ -123,6 +124,60 @@ TEST_F(ParquetThriftReaderTest, complex_nested_file) { ASSERT_EQ(schemaDescriptor.get_column_index("mark"), 4); } +TEST_F(ParquetThriftReaderTest, column_reader) { + // type-decoder.parquet is the part of following table: + // create table type-decoder ( + // int_col int) + // TODO(gaoxin): add more hive types + LocalFileReader reader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", 0); + auto st = reader.open(); + EXPECT_TRUE(st.ok()); + + std::shared_ptr<FileMetaData> metaData; + parse_thrift_footer(&reader, metaData); + tparquet::FileMetaData t_metadata = metaData->to_thrift_metadata(); + + // read the `int_col` column, it's the int-type column, and has ten values: + // -1, 2, -3, 4, -5, 6, -7, 8, -9, 10 + tparquet::ColumnChunk column_chunk = t_metadata.row_groups[0].columns[0]; + tparquet::ColumnMetaData chunk_meta = column_chunk.meta_data; + size_t start_offset = chunk_meta.__isset.dictionary_page_offset + ? chunk_meta.dictionary_page_offset + : chunk_meta.data_page_offset; + size_t chunk_size = chunk_meta.total_compressed_size; + BufferedFileStreamReader stream_reader(&reader, start_offset, chunk_size); + + FieldDescriptor schema_descriptor; + schema_descriptor.parse_from_thrift(t_metadata.schema); + auto field_schema = const_cast<FieldSchema*>(schema_descriptor.get_column(0)); + + ColumnChunkReader chunk_reader(&stream_reader, &column_chunk, field_schema); + size_t batch_size = 10; + size_t int_length = 4; + char data[batch_size * int_length]; + Slice slice(data, batch_size * int_length); + chunk_reader.init(); + uint64_t int_sum = 0; + while (chunk_reader.has_next_page()) { + // seek to next page header + chunk_reader.next_page(); + // load data to decoder + chunk_reader.load_page_data(); + while (chunk_reader.num_values() > 0) { + size_t num_values = chunk_reader.num_values() < batch_size + ? chunk_reader.num_values() < batch_size + : batch_size; + chunk_reader.decode_values(slice, num_values); + auto out_data = reinterpret_cast<Int32*>(slice.data); + for (int i = 0; i < num_values; i++) { + Int32 value = out_data[i]; + int_sum += value; + } + } + } + ASSERT_EQ(int_sum, 5); +} + } // namespace vectorized } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org