This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new b4b126b817 [Feature](parquet-reader) Implements dict filter functionality parquet reader. (#17594) b4b126b817 is described below commit b4b126b817a78609354a1b1c6de75208d21c3479 Author: Qi Chen <kaka11.c...@gmail.com> AuthorDate: Thu Mar 16 20:29:27 2023 +0800 [Feature](parquet-reader) Implements dict filter functionality parquet reader. (#17594) Implements dict filter functionality parquet reader to improve performance. --- be/src/service/internal_service.cpp | 2 +- be/src/vec/data_types/data_type.cpp | 1 + .../vec/exec/format/parquet/bool_plain_decoder.cpp | 2 +- .../vec/exec/format/parquet/bool_plain_decoder.h | 2 +- .../vec/exec/format/parquet/bool_rle_decoder.cpp | 2 +- be/src/vec/exec/format/parquet/bool_rle_decoder.h | 2 +- .../format/parquet/byte_array_dict_decoder.cpp | 44 +- .../exec/format/parquet/byte_array_dict_decoder.h | 10 +- .../format/parquet/byte_array_plain_decoder.cpp | 3 +- .../exec/format/parquet/byte_array_plain_decoder.h | 2 +- be/src/vec/exec/format/parquet/decoder.h | 26 +- .../exec/format/parquet/delta_bit_pack_decoder.h | 9 +- .../format/parquet/fix_length_dict_decoder.hpp | 59 +- .../format/parquet/fix_length_plain_decoder.cpp | 3 +- .../exec/format/parquet/fix_length_plain_decoder.h | 2 +- .../parquet/vparquet_column_chunk_reader.cpp | 9 +- .../format/parquet/vparquet_column_chunk_reader.h | 30 +- .../exec/format/parquet/vparquet_column_reader.cpp | 66 ++- .../exec/format/parquet/vparquet_column_reader.h | 33 +- .../exec/format/parquet/vparquet_group_reader.cpp | 641 ++++++++++++++++----- .../exec/format/parquet/vparquet_group_reader.h | 44 +- be/src/vec/exec/format/parquet/vparquet_reader.cpp | 33 +- be/src/vec/exec/format/parquet/vparquet_reader.h | 17 +- be/src/vec/exec/format/table/iceberg_reader.cpp | 15 +- be/src/vec/exec/format/table/iceberg_reader.h | 6 +- be/src/vec/exec/scan/new_file_scan_node.cpp | 3 +- be/src/vec/exec/scan/vfile_scanner.cpp | 86 ++- be/src/vec/exec/scan/vfile_scanner.h | 15 +- be/src/vec/exec/scan/vscan_node.cpp | 2 + be/src/vec/exec/scan/vscan_node.h | 2 + be/test/vec/exec/parquet/parquet_reader_test.cpp | 6 +- be/test/vec/exec/parquet/parquet_thrift_test.cpp | 14 +- 32 files changed, 957 insertions(+), 234 deletions(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 407ad5dad0..257509a97c 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -480,7 +480,7 @@ void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c break; } case TFileFormatType::FORMAT_PARQUET: { - reader.reset(new vectorized::ParquetReader(params, range, &io_ctx)); + reader.reset(new vectorized::ParquetReader(params, range, &io_ctx, nullptr)); break; } case TFileFormatType::FORMAT_ORC: { diff --git a/be/src/vec/data_types/data_type.cpp b/be/src/vec/data_types/data_type.cpp index 6d70850564..aee0630585 100644 --- a/be/src/vec/data_types/data_type.cpp +++ b/be/src/vec/data_types/data_type.cpp @@ -56,6 +56,7 @@ void IDataType::update_avg_value_size_hint(const IColumn& column, double& avg_va ColumnPtr IDataType::create_column_const(size_t size, const Field& field) const { auto column = create_column(); + column->reserve(1); column->insert(field); return ColumnConst::create(std::move(column), size); } diff --git a/be/src/vec/exec/format/parquet/bool_plain_decoder.cpp b/be/src/vec/exec/format/parquet/bool_plain_decoder.cpp index 87e6a31828..ff05c559a5 100644 --- a/be/src/vec/exec/format/parquet/bool_plain_decoder.cpp +++ b/be/src/vec/exec/format/parquet/bool_plain_decoder.cpp @@ -43,7 +43,7 @@ Status BoolPlainDecoder::skip_values(size_t num_values) { } Status BoolPlainDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, - ColumnSelectVector& select_vector) { + ColumnSelectVector& select_vector, bool is_dict_filter) { auto& column_data = static_cast<ColumnVector<UInt8>&>(*doris_column).get_data(); size_t data_index = column_data.size(); column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); diff --git a/be/src/vec/exec/format/parquet/bool_plain_decoder.h b/be/src/vec/exec/format/parquet/bool_plain_decoder.h index 77ab7f4ccd..08e54ecd60 100644 --- a/be/src/vec/exec/format/parquet/bool_plain_decoder.h +++ b/be/src/vec/exec/format/parquet/bool_plain_decoder.h @@ -37,7 +37,7 @@ public: } Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, - ColumnSelectVector& select_vector) override; + ColumnSelectVector& select_vector, bool is_dict_filter) override; Status skip_values(size_t num_values) override; diff --git a/be/src/vec/exec/format/parquet/bool_rle_decoder.cpp b/be/src/vec/exec/format/parquet/bool_rle_decoder.cpp index 563b6c68df..0856687bbf 100644 --- a/be/src/vec/exec/format/parquet/bool_rle_decoder.cpp +++ b/be/src/vec/exec/format/parquet/bool_rle_decoder.cpp @@ -46,7 +46,7 @@ Status BoolRLEDecoder::skip_values(size_t num_values) { } Status BoolRLEDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, - ColumnSelectVector& select_vector) { + ColumnSelectVector& select_vector, bool is_dict_filter) { auto& column_data = static_cast<ColumnVector<UInt8>&>(*doris_column).get_data(); size_t data_index = column_data.size(); column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); diff --git a/be/src/vec/exec/format/parquet/bool_rle_decoder.h b/be/src/vec/exec/format/parquet/bool_rle_decoder.h index 0b3ba6e05d..6b0c94825f 100644 --- a/be/src/vec/exec/format/parquet/bool_rle_decoder.h +++ b/be/src/vec/exec/format/parquet/bool_rle_decoder.h @@ -30,7 +30,7 @@ public: void set_data(Slice* slice) override; Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, - ColumnSelectVector& select_vector) override; + ColumnSelectVector& select_vector, bool is_dict_filter) override; Status skip_values(size_t num_values) override; diff --git a/be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp b/be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp index e5a86c5061..6962b2a47b 100644 --- a/be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp +++ b/be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp @@ -38,6 +38,7 @@ Status ByteArrayDictDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t total_length += l; } + _dict_value_to_code.reserve(num_values); // For insert_many_strings_overflow _dict_data.resize(total_length + MAX_STRINGS_OVERFLOW_SIZE); _max_value_length = 0; @@ -48,6 +49,7 @@ Status ByteArrayDictDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t offset_cursor += 4; memcpy(&_dict_data[offset], dict_item_address + offset_cursor, l); _dict_items.emplace_back(&_dict_data[offset], l); + _dict_value_to_code[StringRef(&_dict_data[offset], l)] = i; offset_cursor += l; offset += l; if (offset_cursor > length) { @@ -63,19 +65,47 @@ Status ByteArrayDictDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t return Status::OK(); } +Status ByteArrayDictDecoder::read_dict_values_to_column(MutableColumnPtr& doris_column) { + doris_column->insert_many_strings_overflow(&_dict_items[0], _dict_items.size(), + _max_value_length); + return Status::OK(); +} + +Status ByteArrayDictDecoder::get_dict_codes(const ColumnString* string_column, + std::vector<int32_t>* dict_codes) { + for (int i = 0; i < string_column->size(); ++i) { + StringRef dict_value = string_column->get_data_at(i); + dict_codes->emplace_back(_dict_value_to_code[dict_value]); + } + return Status::OK(); +} + +MutableColumnPtr ByteArrayDictDecoder::convert_dict_column_to_string_column( + const ColumnInt32* dict_column) { + auto res = ColumnString::create(); + std::vector<StringRef> dict_values(dict_column->size()); + const auto& data = dict_column->get_data(); + for (size_t i = 0; i < dict_column->size(); ++i) { + dict_values[i] = _dict_items[data[i]]; + } + res->insert_many_strings_overflow(&dict_values[0], dict_values.size(), _max_value_length); + return res; +} + Status ByteArrayDictDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, - ColumnSelectVector& select_vector) { + ColumnSelectVector& select_vector, bool is_dict_filter) { size_t non_null_size = select_vector.num_values() - select_vector.num_nulls(); - if (doris_column->is_column_dictionary() && - assert_cast<ColumnDictI32&>(*doris_column).dict_size() == 0) { - assert_cast<ColumnDictI32&>(*doris_column) - .insert_many_dict_data(&_dict_items[0], _dict_items.size()); + if (doris_column->is_column_dictionary()) { + ColumnDictI32& dict_column = assert_cast<ColumnDictI32&>(*doris_column); + if (dict_column.dict_size() == 0) { + dict_column.insert_many_dict_data(&_dict_items[0], _dict_items.size()); + } } _indexes.resize(non_null_size); _index_batch_decoder->GetBatch(&_indexes[0], non_null_size); - if (doris_column->is_column_dictionary()) { - return _decode_dict_values(doris_column, select_vector); + if (doris_column->is_column_dictionary() || is_dict_filter) { + return _decode_dict_values(doris_column, select_vector, is_dict_filter); } TypeIndex logical_type = remove_nullable(data_type)->get_type_id(); diff --git a/be/src/vec/exec/format/parquet/byte_array_dict_decoder.h b/be/src/vec/exec/format/parquet/byte_array_dict_decoder.h index a90226d7c3..8492937e83 100644 --- a/be/src/vec/exec/format/parquet/byte_array_dict_decoder.h +++ b/be/src/vec/exec/format/parquet/byte_array_dict_decoder.h @@ -31,10 +31,17 @@ public: ~ByteArrayDictDecoder() override = default; Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, - ColumnSelectVector& select_vector) override; + ColumnSelectVector& select_vector, bool is_dict_filter) override; Status set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t length, size_t num_values) override; + Status read_dict_values_to_column(MutableColumnPtr& doris_column) override; + + Status get_dict_codes(const ColumnString* column_string, + std::vector<int32_t>* dict_codes) override; + + MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32* dict_column) override; + protected: template <typename DecimalPrimitiveType> Status _decode_binary_decimal(MutableColumnPtr& doris_column, DataTypePtr& data_type, @@ -44,6 +51,7 @@ protected: std::vector<StringRef> _dict_items; std::vector<uint8_t> _dict_data; size_t _max_value_length; + std::unordered_map<StringRef, int32_t> _dict_value_to_code; }; template <typename DecimalPrimitiveType> diff --git a/be/src/vec/exec/format/parquet/byte_array_plain_decoder.cpp b/be/src/vec/exec/format/parquet/byte_array_plain_decoder.cpp index acc977dbe5..833472665b 100644 --- a/be/src/vec/exec/format/parquet/byte_array_plain_decoder.cpp +++ b/be/src/vec/exec/format/parquet/byte_array_plain_decoder.cpp @@ -38,7 +38,8 @@ Status ByteArrayPlainDecoder::skip_values(size_t num_values) { } Status ByteArrayPlainDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, - ColumnSelectVector& select_vector) { + ColumnSelectVector& select_vector, + bool is_dict_filter) { TypeIndex logical_type = remove_nullable(data_type)->get_type_id(); switch (logical_type) { case TypeIndex::String: diff --git a/be/src/vec/exec/format/parquet/byte_array_plain_decoder.h b/be/src/vec/exec/format/parquet/byte_array_plain_decoder.h index 84bed0dec3..cd35ceaa6f 100644 --- a/be/src/vec/exec/format/parquet/byte_array_plain_decoder.h +++ b/be/src/vec/exec/format/parquet/byte_array_plain_decoder.h @@ -30,7 +30,7 @@ public: ~ByteArrayPlainDecoder() override = default; Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, - ColumnSelectVector& select_vector) override; + ColumnSelectVector& select_vector, bool is_dict_filter) override; Status skip_values(size_t num_values) override; diff --git a/be/src/vec/exec/format/parquet/decoder.h b/be/src/vec/exec/format/parquet/decoder.h index 827e377c2b..eb0d88bd63 100644 --- a/be/src/vec/exec/format/parquet/decoder.h +++ b/be/src/vec/exec/format/parquet/decoder.h @@ -76,7 +76,7 @@ public: // Write the decoded values batch to doris's column virtual Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, - ColumnSelectVector& select_vector) = 0; + ColumnSelectVector& select_vector, bool is_dict_filter) = 0; virtual Status skip_values(size_t num_values) = 0; @@ -84,6 +84,19 @@ public: return Status::NotSupported("set_dict is not supported"); } + virtual Status read_dict_values_to_column(MutableColumnPtr& doris_column) { + return Status::NotSupported("read_dict_values_to_column is not supported"); + } + + virtual Status get_dict_codes(const ColumnString* column_string, + std::vector<int32_t>* dict_codes) { + return Status::NotSupported("get_dict_codes is not supported"); + } + + virtual MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32* dict_column) { + LOG(FATAL) << "Method convert_dict_column_to_string_column is not supported"; + } + protected: int32_t _type_length; Slice* _data = nullptr; @@ -136,11 +149,15 @@ protected: * Decode dictionary-coded values into doris_column, ensure that doris_column is ColumnDictI32 type, * and the coded values must be read into _indexes previously. */ - Status _decode_dict_values(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) { - DCHECK(doris_column->is_column_dictionary()); + Status _decode_dict_values(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector, + bool is_dict_filter) { + DCHECK(doris_column->is_column_dictionary() || is_dict_filter); size_t dict_index = 0; ColumnSelectVector::DataReadType read_type; - auto& column_data = assert_cast<ColumnDictI32&>(*doris_column).get_data(); + PaddedPODArray<Int32>& column_data = + doris_column->is_column_dictionary() + ? assert_cast<ColumnDictI32&>(*doris_column).get_data() + : assert_cast<ColumnInt32&>(*doris_column).get_data(); while (size_t run_length = select_vector.get_next_run(&read_type)) { switch (read_type) { case ColumnSelectVector::CONTENT: { @@ -171,7 +188,6 @@ protected: return Status::OK(); } -protected: // For dictionary encoding std::unique_ptr<uint8_t[]> _dict = nullptr; std::unique_ptr<RleBatchDecoder<uint32_t>> _index_batch_decoder = nullptr; diff --git a/be/src/vec/exec/format/parquet/delta_bit_pack_decoder.h b/be/src/vec/exec/format/parquet/delta_bit_pack_decoder.h index bdebd4d82f..f51941bb3a 100644 --- a/be/src/vec/exec/format/parquet/delta_bit_pack_decoder.h +++ b/be/src/vec/exec/format/parquet/delta_bit_pack_decoder.h @@ -64,7 +64,7 @@ public: : DeltaDecoder(new FixLengthPlainDecoder(physical_type)) {} ~DeltaBitPackDecoder() override = default; Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, - ColumnSelectVector& select_vector) override { + ColumnSelectVector& select_vector, bool is_dict_filter) override { size_t non_null_size = select_vector.num_values() - select_vector.num_nulls(); // decode values _values.resize(non_null_size); @@ -75,7 +75,8 @@ public: _data->size = _values.size() * _type_length; // set decoded value with fix plain decoder init_values_converter(); - return _type_converted_decoder->decode_values(doris_column, data_type, select_vector); + return _type_converted_decoder->decode_values(doris_column, data_type, select_vector, + is_dict_filter); } Status decode(T* buffer, int num_values, int* out_num_values) { @@ -155,7 +156,7 @@ public: } Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, - ColumnSelectVector& select_vector) override { + ColumnSelectVector& select_vector, bool is_dict_filter) override { size_t num_values = select_vector.num_values(); size_t null_count = select_vector.num_nulls(); // init read buffer @@ -222,7 +223,7 @@ public: } Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, - ColumnSelectVector& select_vector) override { + ColumnSelectVector& select_vector, bool is_dict_filter) override { size_t num_values = select_vector.num_values(); size_t null_count = select_vector.num_nulls(); _values.resize(num_values - null_count); diff --git a/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp b/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp index e581a65d10..64daf9325a 100644 --- a/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp +++ b/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp @@ -20,6 +20,7 @@ #include "vec/columns/column_dictionary.h" #include "vec/columns/column_nullable.h" #include "vec/data_types/data_type_nullable.h" +#include "vec/exec/format/parquet/decoder.h" namespace doris::vectorized { @@ -31,7 +32,7 @@ public: ~FixLengthDictDecoder() override = default; Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, - ColumnSelectVector& select_vector) override { + ColumnSelectVector& select_vector, bool is_dict_filter) override { size_t non_null_size = select_vector.num_values() - select_vector.num_nulls(); if (doris_column->is_column_dictionary() && assert_cast<ColumnDictI32&>(*doris_column).dict_size() == 0) { @@ -43,11 +44,22 @@ public: assert_cast<ColumnDictI32&>(*doris_column) .insert_many_dict_data(&dict_items[0], dict_items.size()); } + if (doris_column->is_column_dictionary()) { + ColumnDictI32& dict_column = assert_cast<ColumnDictI32&>(*doris_column); + if (dict_column.dict_size() == 0) { + std::vector<StringRef> dict_items; + dict_items.reserve(_dict_items.size()); + for (int i = 0; i < _dict_items.size(); ++i) { + dict_items.emplace_back((char*)(&_dict_items[i]), _type_length); + } + dict_column.insert_many_dict_data(&dict_items[0], dict_items.size()); + } + } _indexes.resize(non_null_size); _index_batch_decoder->GetBatch(&_indexes[0], non_null_size); - if (doris_column->is_column_dictionary()) { - return _decode_dict_values(doris_column, select_vector); + if (doris_column->is_column_dictionary() || is_dict_filter) { + return _decode_dict_values(doris_column, select_vector, is_dict_filter); } TypeIndex logical_type = remove_nullable(data_type)->get_type_id(); @@ -364,7 +376,7 @@ public: ~FixLengthDictDecoder() override = default; Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, - ColumnSelectVector& select_vector) override { + ColumnSelectVector& select_vector, bool is_dict_filter) override { size_t non_null_size = select_vector.num_values() - select_vector.num_nulls(); if (doris_column->is_column_dictionary() && assert_cast<ColumnDictI32&>(*doris_column).dict_size() == 0) { @@ -379,8 +391,8 @@ public: _indexes.resize(non_null_size); _index_batch_decoder->GetBatch(&_indexes[0], non_null_size); - if (doris_column->is_column_dictionary()) { - return _decode_dict_values(doris_column, select_vector); + if (doris_column->is_column_dictionary() || is_dict_filter) { + return _decode_dict_values(doris_column, select_vector, is_dict_filter); } TypeIndex logical_type = remove_nullable(data_type)->get_type_id(); @@ -434,13 +446,47 @@ public: _dict = std::move(dict); char* dict_item_address = reinterpret_cast<char*>(_dict.get()); _dict_items.resize(num_values); + _dict_value_to_code.reserve(num_values); for (size_t i = 0; i < num_values; ++i) { _dict_items[i] = dict_item_address; + _dict_value_to_code[StringRef(_dict_items[i], _type_length)] = i; dict_item_address += _type_length; } return Status::OK(); } + Status read_dict_values_to_column(MutableColumnPtr& doris_column) override { + size_t dict_items_size = _dict_items.size(); + std::vector<StringRef> dict_values(dict_items_size); + for (size_t i = 0; i < dict_items_size; ++i) { + dict_values.emplace_back(_dict_items[i], _type_length); + } + doris_column->insert_many_strings(&dict_values[0], dict_items_size); + return Status::OK(); + } + + Status get_dict_codes(const ColumnString* string_column, + std::vector<int32_t>* dict_codes) override { + size_t size = string_column->size(); + dict_codes->reserve(size); + for (int i = 0; i < size; ++i) { + StringRef dict_value = string_column->get_data_at(i); + dict_codes->emplace_back(_dict_value_to_code[dict_value]); + } + return Status::OK(); + } + + MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32* dict_column) override { + auto res = ColumnString::create(); + std::vector<StringRef> dict_values(dict_column->size()); + const auto& data = dict_column->get_data(); + for (size_t i = 0; i < dict_column->size(); ++i) { + dict_values.emplace_back(_dict_items[data[i]], _type_length); + } + res->insert_many_strings(&dict_values[0], dict_values.size()); + return res; + } + protected: template <typename DecimalPrimitiveType> Status _decode_binary_decimal(MutableColumnPtr& doris_column, DataTypePtr& data_type, @@ -528,6 +574,7 @@ protected: // For dictionary encoding std::vector<char*> _dict_items; + std::unordered_map<StringRef, int32_t> _dict_value_to_code; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/fix_length_plain_decoder.cpp b/be/src/vec/exec/format/parquet/fix_length_plain_decoder.cpp index e50fcc627b..01bfe67981 100644 --- a/be/src/vec/exec/format/parquet/fix_length_plain_decoder.cpp +++ b/be/src/vec/exec/format/parquet/fix_length_plain_decoder.cpp @@ -32,7 +32,8 @@ Status FixLengthPlainDecoder::skip_values(size_t num_values) { } Status FixLengthPlainDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, - ColumnSelectVector& select_vector) { + ColumnSelectVector& select_vector, + bool is_dict_filter) { size_t non_null_size = select_vector.num_values() - select_vector.num_nulls(); if (UNLIKELY(_offset + _type_length * non_null_size > _data->size)) { return Status::IOError("Out-of-bounds access in parquet data decoder"); diff --git a/be/src/vec/exec/format/parquet/fix_length_plain_decoder.h b/be/src/vec/exec/format/parquet/fix_length_plain_decoder.h index b8f516444e..c35a97fc3e 100644 --- a/be/src/vec/exec/format/parquet/fix_length_plain_decoder.h +++ b/be/src/vec/exec/format/parquet/fix_length_plain_decoder.h @@ -33,7 +33,7 @@ public: ~FixLengthPlainDecoder() override = default; Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, - ColumnSelectVector& select_vector) override; + ColumnSelectVector& select_vector, bool is_dict_filter) override; Status skip_values(size_t num_values) override; diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp index d53023c1a2..9308624246 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp @@ -52,6 +52,9 @@ Status ColumnChunkReader::init() { } Status ColumnChunkReader::next_page() { + if (_state == HEADER_PARSED) { + return Status::OK(); + } if (UNLIKELY(_state == NOT_INIT)) { return Status::Corruption("Should initialize chunk reader"); } @@ -258,16 +261,16 @@ size_t ColumnChunkReader::get_def_levels(level_t* levels, size_t n) { } Status ColumnChunkReader::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, - ColumnSelectVector& select_vector) { + ColumnSelectVector& select_vector, bool is_dict_filter) { SCOPED_RAW_TIMER(&_statistics.decode_value_time); - if (UNLIKELY(doris_column->is_column_dictionary() && !_has_dict)) { + if (UNLIKELY((doris_column->is_column_dictionary() || is_dict_filter) && !_has_dict)) { return Status::IOError("Not dictionary coded"); } if (UNLIKELY(_remaining_num_values < select_vector.num_values())) { return Status::IOError("Decode too many values in current page"); } _remaining_num_values -= select_vector.num_values(); - return _page_decoder->decode_values(doris_column, data_type, select_vector); + return _page_decoder->decode_values(doris_column, data_type, select_vector, is_dict_filter); } int32_t ColumnChunkReader::_get_type_length() { diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h index 2bf21fbf7b..303af52104 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h @@ -86,14 +86,13 @@ public: // Skip current page(will not read and parse) if the page is filtered by predicates. Status skip_page() { + Status res = Status::OK(); _remaining_num_values = 0; if (_state == HEADER_PARSED) { - return _page_reader->skip_page(); + res = _page_reader->skip_page(); } - if (_state != DATA_LOADED) { - return Status::Corruption("Should parse page header to skip page"); - } - return Status::OK(); + _state = PAGE_SKIPPED; + return res; } // Skip some values(will not read and parse) in current page if the values are filtered by predicates. // when skip_data = false, the underlying decoder will not skip data, @@ -124,7 +123,7 @@ public: // Decode values in current page into doris column. Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, - ColumnSelectVector& select_vector); + ColumnSelectVector& select_vector, bool is_dict_filter); // Get the repetition level decoder of current page. LevelDecoder& rep_level_decoder() { return _rep_level_decoder; } @@ -134,6 +133,8 @@ public: level_t max_rep_level() const { return _max_rep_level; } level_t max_def_level() const { return _max_def_level; } + bool has_dict() const { return _has_dict; }; + // Get page decoder Decoder* get_page_decoder() { return _page_decoder; } @@ -142,8 +143,23 @@ public: return _statistics; } + Status read_dict_values_to_column(MutableColumnPtr& doris_column) { + return _decoders[static_cast<int>(tparquet::Encoding::RLE_DICTIONARY)] + ->read_dict_values_to_column(doris_column); + } + + Status get_dict_codes(const ColumnString* column_string, std::vector<int32_t>* dict_codes) { + return _decoders[static_cast<int>(tparquet::Encoding::RLE_DICTIONARY)]->get_dict_codes( + column_string, dict_codes); + } + + MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32* dict_column) { + return _decoders[static_cast<int>(tparquet::Encoding::RLE_DICTIONARY)] + ->convert_dict_column_to_string_column(dict_column); + } + private: - enum ColumnChunkReaderState { NOT_INIT, INITIALIZED, HEADER_PARSED, DATA_LOADED }; + enum ColumnChunkReaderState { NOT_INIT, INITIALIZED, HEADER_PARSED, DATA_LOADED, PAGE_SKIPPED }; Status _decode_dict_page(); void _reserve_decompress_buf(size_t size); diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp index 3e94a3082f..0357c77b22 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp @@ -214,7 +214,8 @@ Status ScalarColumnReader::_skip_values(size_t num_values) { } Status ScalarColumnReader::_read_values(size_t num_values, ColumnPtr& doris_column, - DataTypePtr& type, ColumnSelectVector& select_vector) { + DataTypePtr& type, ColumnSelectVector& select_vector, + bool is_dict_filter) { if (num_values == 0) { return Status::OK(); } @@ -271,12 +272,12 @@ Status ScalarColumnReader::_read_values(size_t num_values, ColumnPtr& doris_colu SCOPED_RAW_TIMER(&_decode_null_map_time); select_vector.set_run_length_null_map(null_map, num_values, map_data_column); } - return _chunk_reader->decode_values(data_column, type, select_vector); + return _chunk_reader->decode_values(data_column, type, select_vector, is_dict_filter); } Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataTypePtr& type, ColumnSelectVector& select_vector, size_t batch_size, - size_t* read_rows, bool* eof) { + size_t* read_rows, bool* eof, bool is_dict_filter) { _rep_levels.resize(0); _def_levels.resize(0); size_t parsed_rows = 0; @@ -373,7 +374,7 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType SCOPED_RAW_TIMER(&_decode_null_map_time); select_vector.set_run_length_null_map(null_map, num_values, map_data_column); } - RETURN_IF_ERROR(_chunk_reader->decode_values(data_column, type, select_vector)); + RETURN_IF_ERROR(_chunk_reader->decode_values(data_column, type, select_vector, is_dict_filter)); if (ancestor_nulls != 0) { _chunk_reader->skip_values(ancestor_nulls, false); } @@ -384,10 +385,44 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType } return Status::OK(); } +Status ScalarColumnReader::read_dict_values_to_column(MutableColumnPtr& doris_column, + bool* has_dict) { + bool loaded; + RETURN_IF_ERROR(_try_load_dict_page(&loaded, has_dict)); + if (loaded && has_dict) { + return _chunk_reader->read_dict_values_to_column(doris_column); + } + return Status::OK(); +} + +Status ScalarColumnReader::get_dict_codes(const ColumnString* column_string, + std::vector<int32_t>* dict_codes) { + return _chunk_reader->get_dict_codes(column_string, dict_codes); +} + +MutableColumnPtr ScalarColumnReader::convert_dict_column_to_string_column( + const ColumnInt32* dict_column) { + return _chunk_reader->convert_dict_column_to_string_column(dict_column); +} + +Status ScalarColumnReader::_try_load_dict_page(bool* loaded, bool* has_dict) { + *loaded = false; + *has_dict = false; + if (_chunk_reader->remaining_num_values() == 0) { + if (!_chunk_reader->has_next_page()) { + *loaded = false; + return Status::OK(); + } + RETURN_IF_ERROR(_chunk_reader->next_page()); + *loaded = true; + *has_dict = _chunk_reader->has_dict(); + } + return Status::OK(); +} Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& type, ColumnSelectVector& select_vector, size_t batch_size, - size_t* read_rows, bool* eof) { + size_t* read_rows, bool* eof, bool is_dict_filter) { if (_chunk_reader->remaining_num_values() == 0) { if (!_chunk_reader->has_next_page()) { *eof = true; @@ -398,7 +433,8 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr } if (_nested_column) { RETURN_IF_ERROR(_chunk_reader->load_page_data_idempotent()); - return _read_nested_column(doris_column, type, select_vector, batch_size, read_rows, eof); + return _read_nested_column(doris_column, type, select_vector, batch_size, read_rows, eof, + is_dict_filter); } // generate the row ranges that should be read @@ -452,7 +488,8 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr if (skip_whole_batch) { RETURN_IF_ERROR(_skip_values(read_values)); } else { - RETURN_IF_ERROR(_read_values(read_values, doris_column, type, select_vector)); + RETURN_IF_ERROR(_read_values(read_values, doris_column, type, select_vector, + is_dict_filter)); } has_read += read_values; _current_row_index += read_values; @@ -478,7 +515,7 @@ Status ArrayColumnReader::init(std::unique_ptr<ParquetColumnReader> element_read Status ArrayColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& type, ColumnSelectVector& select_vector, size_t batch_size, - size_t* read_rows, bool* eof) { + size_t* read_rows, bool* eof, bool is_dict_filter) { MutableColumnPtr data_column; NullMap* null_map_ptr = nullptr; if (doris_column->is_nullable()) { @@ -499,7 +536,7 @@ Status ArrayColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& const_cast<DataTypePtr&>( (reinterpret_cast<const DataTypeArray*>(remove_nullable(type).get())) ->get_nested_type()), - select_vector, batch_size, read_rows, eof)); + select_vector, batch_size, read_rows, eof, is_dict_filter)); if (*read_rows == 0) { return Status::OK(); } @@ -523,7 +560,7 @@ Status MapColumnReader::init(std::unique_ptr<ParquetColumnReader> key_reader, Status MapColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& type, ColumnSelectVector& select_vector, size_t batch_size, - size_t* read_rows, bool* eof) { + size_t* read_rows, bool* eof, bool is_dict_filter) { MutableColumnPtr data_column; NullMap* null_map_ptr = nullptr; if (doris_column->is_nullable()) { @@ -553,10 +590,11 @@ Status MapColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& t bool key_eof = false; bool value_eof = false; RETURN_IF_ERROR(_key_reader->read_column_data(key_column, key_type, select_vector, batch_size, - &key_rows, &key_eof)); + &key_rows, &key_eof, is_dict_filter)); select_vector.reset(); RETURN_IF_ERROR(_value_reader->read_column_data(value_column, value_type, select_vector, - batch_size, &value_rows, &value_eof)); + batch_size, &value_rows, &value_eof, + is_dict_filter)); DCHECK_EQ(key_rows, value_rows); DCHECK_EQ(key_eof, value_eof); *read_rows = key_rows; @@ -581,7 +619,7 @@ Status StructColumnReader::init(std::vector<std::unique_ptr<ParquetColumnReader> } Status StructColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& type, ColumnSelectVector& select_vector, size_t batch_size, - size_t* read_rows, bool* eof) { + size_t* read_rows, bool* eof, bool is_dict_filter) { MutableColumnPtr data_column; NullMap* null_map_ptr = nullptr; if (doris_column->is_nullable()) { @@ -609,7 +647,7 @@ Status StructColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr size_t loop_rows = 0; bool loop_eof = false; _child_readers[i]->read_column_data(doris_field, doris_type, select_vector, batch_size, - &loop_rows, &loop_eof); + &loop_rows, &loop_eof, is_dict_filter); if (i != 0) { DCHECK_EQ(*read_rows, loop_rows); DCHECK_EQ(*eof, loop_eof); diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_reader.h index 292eb06fba..7f9764ec06 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h @@ -82,7 +82,21 @@ public: virtual ~ParquetColumnReader() = default; virtual Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, ColumnSelectVector& select_vector, size_t batch_size, - size_t* read_rows, bool* eof) = 0; + size_t* read_rows, bool* eof, bool is_dict_filter) = 0; + + virtual Status read_dict_values_to_column(MutableColumnPtr& doris_column, bool* has_dict) { + return Status::NotSupported("read_dict_values_to_column is not supported"); + } + + virtual Status get_dict_codes(const ColumnString* column_string, + std::vector<int32_t>* dict_codes) { + return Status::NotSupported("get_dict_codes is not supported"); + } + + virtual MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32* dict_column) { + LOG(FATAL) << "Method convert_dict_column_to_string_column is not supported"; + } + static Status create(io::FileReaderSPtr file, FieldSchema* field, const tparquet::RowGroup& row_group, const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz, @@ -118,7 +132,11 @@ public: Status init(io::FileReaderSPtr file, FieldSchema* field, size_t max_buf_size); Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, ColumnSelectVector& select_vector, size_t batch_size, size_t* read_rows, - bool* eof) override; + bool* eof, bool is_dict_filter) override; + Status read_dict_values_to_column(MutableColumnPtr& doris_column, bool* has_dict) override; + Status get_dict_codes(const ColumnString* column_string, + std::vector<int32_t>* dict_codes) override; + MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32* dict_column) override; const std::vector<level_t>& get_rep_level() const override { return _rep_levels; } const std::vector<level_t>& get_def_level() const override { return _def_levels; } Statistics statistics() override { @@ -136,10 +154,11 @@ private: Status _skip_values(size_t num_values); Status _read_values(size_t num_values, ColumnPtr& doris_column, DataTypePtr& type, - ColumnSelectVector& select_vector); + ColumnSelectVector& select_vector, bool is_dict_filter); Status _read_nested_column(ColumnPtr& doris_column, DataTypePtr& type, ColumnSelectVector& select_vector, size_t batch_size, - size_t* read_rows, bool* eof); + size_t* read_rows, bool* eof, bool is_dict_filter); + Status _try_load_dict_page(bool* loaded, bool* has_dict); }; class ArrayColumnReader : public ParquetColumnReader { @@ -150,7 +169,7 @@ public: Status init(std::unique_ptr<ParquetColumnReader> element_reader, FieldSchema* field); Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, ColumnSelectVector& select_vector, size_t batch_size, size_t* read_rows, - bool* eof) override; + bool* eof, bool is_dict_filter) override; const std::vector<level_t>& get_rep_level() const override { return _element_reader->get_rep_level(); } @@ -174,7 +193,7 @@ public: std::unique_ptr<ParquetColumnReader> value_reader, FieldSchema* field); Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, ColumnSelectVector& select_vector, size_t batch_size, size_t* read_rows, - bool* eof) override; + bool* eof, bool is_dict_filter) override; const std::vector<level_t>& get_rep_level() const override { return _key_reader->get_rep_level(); @@ -207,7 +226,7 @@ public: FieldSchema* field); Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, ColumnSelectVector& select_vector, size_t batch_size, size_t* read_rows, - bool* eof) override; + bool* eof, bool is_dict_filter) override; const std::vector<level_t>& get_rep_level() const override { return _child_readers[0]->get_rep_level(); diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index 71f77f3735..b48cd722b6 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -17,9 +17,14 @@ #include "vparquet_group_reader.h" +#include "exprs/create_predicate_function.h" #include "schema_desc.h" #include "util/simd/bits.h" #include "vec/columns/column_const.h" +#include "vec/exprs/vdirect_in_predicate.h" +#include "vec/exprs/vectorized_fn_call.h" +#include "vec/exprs/vliteral.h" +#include "vec/exprs/vslot_ref.h" #include "vparquet_column_reader.h" namespace doris::vectorized { @@ -31,7 +36,7 @@ RowGroupReader::RowGroupReader(io::FileReaderSPtr file_reader, const int32_t row_group_id, const tparquet::RowGroup& row_group, cctz::time_zone* ctz, const PositionDeleteContext& position_delete_ctx, - const LazyReadContext& lazy_read_ctx) + const LazyReadContext& lazy_read_ctx, RuntimeState* state) : _file_reader(file_reader), _read_columns(read_columns), _row_group_id(row_group_id), @@ -39,14 +44,35 @@ RowGroupReader::RowGroupReader(io::FileReaderSPtr file_reader, _remaining_rows(row_group.num_rows), _ctz(ctz), _position_delete_ctx(position_delete_ctx), - _lazy_read_ctx(lazy_read_ctx) {} + _lazy_read_ctx(lazy_read_ctx), + _state(state), + _obj_pool(new ObjectPool()) {} RowGroupReader::~RowGroupReader() { _column_readers.clear(); + for (auto* ctx : _dict_filter_conjuncts) { + if (ctx) { + ctx->close(_state); + } + } + _obj_pool->clear(); } -Status RowGroupReader::init(const FieldDescriptor& schema, std::vector<RowRange>& row_ranges, - std::unordered_map<int, tparquet::OffsetIndex>& col_offsets) { +Status RowGroupReader::init( + const FieldDescriptor& schema, std::vector<RowRange>& row_ranges, + std::unordered_map<int, tparquet::OffsetIndex>& col_offsets, + const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, + const std::unordered_map<std::string, int>* colname_to_slot_id, + const std::vector<VExprContext*>* not_single_slot_filter_conjuncts, + const std::unordered_map<int, std::vector<VExprContext*>>* slot_id_to_filter_conjuncts) { + _tuple_descriptor = tuple_descriptor; + _row_descriptor = row_descriptor; + _col_name_to_slot_id = colname_to_slot_id; + _slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts; + if (not_single_slot_filter_conjuncts) { + _filter_conjuncts.insert(_filter_conjuncts.end(), not_single_slot_filter_conjuncts->begin(), + not_single_slot_filter_conjuncts->end()); + } _merge_read_ranges(row_ranges); if (_read_columns.empty()) { // Query task that only select columns in path. @@ -71,11 +97,137 @@ Status RowGroupReader::init(const FieldDescriptor& schema, std::vector<RowRange> } _column_readers[read_col._file_slot_name] = std::move(reader); } + // Check if single slot can be filtered by dict. + if (!_slot_id_to_filter_conjuncts) { + return Status::OK(); + } + for (auto& predicate_col_name : _lazy_read_ctx.predicate_columns) { + auto field = const_cast<FieldSchema*>(schema.get_column(predicate_col_name)); + if (_can_filter_by_dict(predicate_col_name, + _row_group_meta.columns[field->physical_column_index].meta_data)) { + _dict_filter_col_names.emplace_back(predicate_col_name); + } else { + int slot_id = _col_name_to_slot_id->at(predicate_col_name); + if (_slot_id_to_filter_conjuncts->find(slot_id) != + _slot_id_to_filter_conjuncts->end()) { + for (VExprContext* ctx : _slot_id_to_filter_conjuncts->at(slot_id)) { + _filter_conjuncts.push_back(ctx); + } + } + } + } + RETURN_IF_ERROR(_rewrite_dict_predicates()); return Status::OK(); } +bool RowGroupReader::_can_filter_by_dict(const string& predicate_col_name, + const tparquet::ColumnMetaData& column_metadata) { + SlotDescriptor* slot = nullptr; + const std::vector<SlotDescriptor*>& slots = _tuple_descriptor->slots(); + int slot_id = _col_name_to_slot_id->at(predicate_col_name); + for (auto each : slots) { + if (each->id() == slot_id) { + slot = each; + break; + } + } + if (!slot->type().is_string_type()) { + return false; + } + + if (_slot_id_to_filter_conjuncts->find(slot_id) == _slot_id_to_filter_conjuncts->end()) { + return false; + } + + if (!is_dictionary_encoded(column_metadata)) { + return false; + } + + // TODOļ¼check expr like 'a > 10 is null', 'a > 10' should can be filter by dict. + for (VExprContext* ctx : _slot_id_to_filter_conjuncts->at(slot_id)) { + const VExpr* root_expr = ctx->root(); + if (root_expr->node_type() == TExprNodeType::FUNCTION_CALL) { + std::string is_null_str; + std::string function_name = root_expr->fn().name.function_name; + if (function_name.compare("is_null_pred") == 0 || + function_name.compare("is_not_null_pred") == 0) { + return false; + } + } + } + + return true; +} +// This function is copied from +// https://github.com/apache/impala/blob/master/be/src/exec/parquet/hdfs-parquet-scanner.cc#L1717 +bool RowGroupReader::is_dictionary_encoded(const tparquet::ColumnMetaData& column_metadata) { + // The Parquet spec allows for column chunks to have mixed encodings + // where some data pages are dictionary-encoded and others are plain + // encoded. For example, a Parquet file writer might start writing + // a column chunk as dictionary encoded, but it will switch to plain + // encoding if the dictionary grows too large. + // + // In order for dictionary filters to skip the entire row group, + // the conjuncts must be evaluated on column chunks that are entirely + // encoded with the dictionary encoding. There are two checks + // available to verify this: + // 1. The encoding_stats field on the column chunk metadata provides + // information about the number of data pages written in each + // format. This allows for a specific check of whether all the + // data pages are dictionary encoded. + // 2. The encodings field on the column chunk metadata lists the + // encodings used. If this list contains the dictionary encoding + // and does not include unexpected encodings (i.e. encodings not + // associated with definition/repetition levels), then it is entirely + // dictionary encoded. + if (column_metadata.__isset.encoding_stats) { + // Condition #1 above + for (const tparquet::PageEncodingStats& enc_stat : column_metadata.encoding_stats) { + if (enc_stat.page_type == tparquet::PageType::DATA_PAGE && + (enc_stat.encoding != tparquet::Encoding::PLAIN_DICTIONARY && + enc_stat.encoding != tparquet::Encoding::RLE_DICTIONARY) && + enc_stat.count > 0) { + return false; + } + } + } else { + // Condition #2 above + bool has_dict_encoding = false; + bool has_nondict_encoding = false; + for (const tparquet::Encoding::type& encoding : column_metadata.encodings) { + if (encoding == tparquet::Encoding::PLAIN_DICTIONARY || + encoding == tparquet::Encoding::RLE_DICTIONARY) { + has_dict_encoding = true; + } + + // RLE and BIT_PACKED are used for repetition/definition levels + if (encoding != tparquet::Encoding::PLAIN_DICTIONARY && + encoding != tparquet::Encoding::RLE_DICTIONARY && + encoding != tparquet::Encoding::RLE && encoding != tparquet::Encoding::BIT_PACKED) { + has_nondict_encoding = true; + break; + } + } + // Not entirely dictionary encoded if: + // 1. No dictionary encoding listed + // OR + // 2. Some non-dictionary encoding is listed + if (!has_dict_encoding || has_nondict_encoding) { + return false; + } + } + + return true; +} + Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_rows, bool* batch_eof) { + if (_is_row_group_filtered) { + *read_rows = 0; + *batch_eof = true; + return Status::OK(); + } + // Process external table query task that select columns are all from path. if (_read_columns.empty()) { RETURN_IF_ERROR(_read_empty_batch(batch_size, read_rows, batch_eof)); @@ -113,10 +265,13 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_ columns_to_filter[i] = i; } if (_lazy_read_ctx.vconjunct_ctx != nullptr) { - int result_column_id = -1; - RETURN_IF_ERROR(_lazy_read_ctx.vconjunct_ctx->execute(block, &result_column_id)); - ColumnPtr& filter_column = block->get_by_position(result_column_id).column; - RETURN_IF_ERROR(_filter_block(block, filter_column, column_to_keep, columns_to_filter)); + std::vector<IColumn::Filter*> filters; + if (_position_delete_ctx.has_filter) { + filters.push_back(_pos_delete_filter_ptr.get()); + } + RETURN_IF_ERROR(_execute_conjuncts_and_filter_block(_filter_conjuncts, filters, block, + columns_to_filter, column_to_keep)); + _convert_dict_cols_to_string_cols(block); } else { RETURN_IF_ERROR(_filter_block(block, column_to_keep, columns_to_filter)); } @@ -139,6 +294,25 @@ Status RowGroupReader::_read_column_data(Block* block, const std::vector<std::st auto& column_with_type_and_name = block->get_by_name(read_col); auto& column_ptr = column_with_type_and_name.column; auto& column_type = column_with_type_and_name.type; + auto col_iter = + std::find(_dict_filter_col_names.begin(), _dict_filter_col_names.end(), read_col); + bool is_dict_filter = false; + if (col_iter != _dict_filter_col_names.end()) { + MutableColumnPtr dict_column = ColumnVector<Int32>::create(); + size_t pos = block->get_position_by_name(read_col); + if (column_type->is_nullable()) { + block->get_by_position(pos).type = + std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>()); + block->replace_by_position( + pos, ColumnNullable::create(std::move(dict_column), + ColumnUInt8::create(dict_column->size(), 0))); + } else { + block->get_by_position(pos).type = std::make_shared<DataTypeInt32>(); + block->replace_by_position(pos, std::move(dict_column)); + } + is_dict_filter = true; + } + size_t col_read_rows = 0; bool col_eof = false; // Should reset _filter_map_index to 0 when reading next column. @@ -147,7 +321,7 @@ Status RowGroupReader::_read_column_data(Block* block, const std::vector<std::st size_t loop_rows = 0; RETURN_IF_ERROR(_column_readers[read_col]->read_column_data( column_ptr, column_type, select_vector, batch_size - col_read_rows, &loop_rows, - &col_eof)); + &col_eof, is_dict_filter)); col_read_rows += loop_rows; } if (batch_read_rows > 0 && batch_read_rows != col_read_rows) { @@ -169,7 +343,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re size_t pre_read_rows; bool pre_eof; size_t origin_column_num = block->columns(); - int filter_column_id = -1; + IColumn::Filter result_filter; while (true) { // read predicate columns pre_read_rows = 0; @@ -194,16 +368,21 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re // The following process may be tricky and time-consuming, but we have no other way. block->get_by_position(0).column->assume_mutable()->resize(pre_read_rows); } - RETURN_IF_ERROR(_lazy_read_ctx.vconjunct_ctx->execute(block, &filter_column_id)); - ColumnPtr& sv = block->get_by_position(filter_column_id).column; + result_filter.assign(pre_read_rows, static_cast<unsigned char>(1)); + bool can_filter_all = false; + std::vector<IColumn::Filter*> filters; + if (_position_delete_ctx.has_filter) { + filters.push_back(_pos_delete_filter_ptr.get()); + } + RETURN_IF_ERROR(_execute_conjuncts(_filter_conjuncts, filters, block, &result_filter, + &can_filter_all)); + if (_lazy_read_ctx.resize_first_column) { // We have to clean the first column to insert right data. block->get_by_position(0).column->assume_mutable()->clear(); } - // build filter map - bool can_filter_all = false; - const uint8_t* filter_map = _build_filter_map(sv, pre_read_rows, &can_filter_all); + const uint8_t* __restrict filter_map = result_filter.data(); select_vector_ptr.reset(new ColumnSelectVector(filter_map, pre_read_rows, can_filter_all)); if (select_vector_ptr->filter_all() && !pre_eof) { // If continuous batches are skipped, we can cache them to skip a whole page @@ -256,14 +435,16 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re // generated from next batch, so the filter column is removed ahead. DCHECK_EQ(block->rows(), 0); } else { - const auto& filter_column = block->get_by_position(filter_column_id).column; - RETURN_IF_ERROR(_filter_block(block, filter_column, origin_column_num, - _lazy_read_ctx.all_predicate_col_ids)); + RETURN_IF_ERROR(_filter_block_internal(block, _lazy_read_ctx.all_predicate_col_ids, + result_filter)); + Block::erase_useless_column(block, origin_column_num); } } else { Block::erase_useless_column(block, origin_column_num); } + _convert_dict_cols_to_string_cols(block); + size_t column_num = block->columns(); size_t column_size = 0; for (int i = 0; i < column_num; ++i) { @@ -283,55 +464,9 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re return _fill_missing_columns(block, column_size, _lazy_read_ctx.missing_columns); } -const uint8_t* RowGroupReader::_build_filter_map(ColumnPtr& sv, size_t num_rows, - bool* can_filter_all) { - const uint8_t* filter_map = nullptr; - if (auto* nullable_column = check_and_get_column<ColumnNullable>(*sv)) { - size_t column_size = nullable_column->size(); - if (column_size == 0) { - *can_filter_all = true; - } else { - DCHECK_EQ(column_size, num_rows); - const auto* __restrict null_map_data = nullable_column->get_null_map_data().data(); - ColumnUInt8* concrete_column = typeid_cast<ColumnUInt8*>( - nullable_column->get_nested_column_ptr()->assume_mutable().get()); - auto* __restrict filter_data = concrete_column->get_data().data(); - if (_position_delete_ctx.has_filter) { - auto* __restrict pos_delete_filter_data = _pos_delete_filter_ptr->data(); - for (size_t i = 0; i < num_rows; ++i) { - filter_data[i] &= (!null_map_data[i]) & pos_delete_filter_data[i]; - } - } else { - for (size_t i = 0; i < num_rows; ++i) { - filter_data[i] &= (!null_map_data[i]); - } - } - filter_map = filter_data; - } - } else if (auto* const_column = check_and_get_column<ColumnConst>(*sv)) { - // filter all - *can_filter_all = !const_column->get_bool(0); - } else { - MutableColumnPtr mutable_holder = sv->assume_mutable(); - ColumnUInt8* mutable_filter_column = typeid_cast<ColumnUInt8*>(mutable_holder.get()); - IColumn::Filter& filter = mutable_filter_column->get_data(); - auto* __restrict filter_data = filter.data(); - const size_t size = filter.size(); - - if (_position_delete_ctx.has_filter) { - auto* __restrict pos_delete_filter_data = _pos_delete_filter_ptr->data(); - for (size_t i = 0; i < size; ++i) { - filter_data[i] &= pos_delete_filter_data[i]; - } - } - filter_map = filter_data; - } - return filter_map; -} - void RowGroupReader::_rebuild_select_vector(ColumnSelectVector& select_vector, std::unique_ptr<uint8_t[]>& filter_map, - size_t pre_read_rows) { + size_t pre_read_rows) const { if (_cached_filtered_rows == 0) { return; } @@ -493,73 +628,6 @@ Status RowGroupReader::_build_pos_delete_filter(size_t read_rows) { return Status::OK(); } -Status RowGroupReader::_filter_block(Block* block, const ColumnPtr& filter_column, - int column_to_keep, std::vector<uint32_t> columns_to_filter) { - if (auto* nullable_column = check_and_get_column<ColumnNullable>(*filter_column)) { - const auto& nested_column = nullable_column->get_nested_column_ptr(); - - MutableColumnPtr mutable_holder = - nested_column->use_count() == 1 - ? nested_column->assume_mutable() - : nested_column->clone_resized(nested_column->size()); - - ColumnUInt8* concrete_column = typeid_cast<ColumnUInt8*>(mutable_holder.get()); - if (!concrete_column) { - return Status::InvalidArgument( - "Illegal type {} of column for filter. Must be UInt8 or Nullable(UInt8).", - filter_column->get_name()); - } - auto* __restrict null_map_data = nullable_column->get_null_map_data().data(); - IColumn::Filter& filter = concrete_column->get_data(); - auto* __restrict filter_data = filter.data(); - const size_t size = filter.size(); - - if (_position_delete_ctx.has_filter) { - auto* __restrict pos_delete_filter_data = _pos_delete_filter_ptr->data(); - for (size_t i = 0; i < size; ++i) { - filter_data[i] &= (!null_map_data[i]) & pos_delete_filter_data[i]; - } - } else { - for (size_t i = 0; i < size; ++i) { - filter_data[i] &= (!null_map_data[i]); - } - } - RETURN_IF_ERROR(_filter_block_internal(block, columns_to_filter, filter)); - } else if (auto* const_column = check_and_get_column<ColumnConst>(*filter_column)) { - bool ret = const_column->get_bool(0); - if (!ret) { - for (auto& col : columns_to_filter) { - std::move(*block->get_by_position(col).column).assume_mutable()->clear(); - } - } - } else { - MutableColumnPtr mutable_holder = - filter_column->use_count() == 1 - ? filter_column->assume_mutable() - : filter_column->clone_resized(filter_column->size()); - ColumnUInt8* mutable_filter_column = typeid_cast<ColumnUInt8*>(mutable_holder.get()); - if (!mutable_filter_column) { - return Status::InvalidArgument( - "Illegal type {} of column for filter. Must be UInt8 or Nullable(UInt8).", - filter_column->get_name()); - } - - IColumn::Filter& filter = mutable_filter_column->get_data(); - auto* __restrict filter_data = filter.data(); - - if (_position_delete_ctx.has_filter) { - auto* __restrict pos_delete_filter_data = _pos_delete_filter_ptr->data(); - const size_t size = filter.size(); - for (size_t i = 0; i < size; ++i) { - filter_data[i] &= pos_delete_filter_data[i]; - } - } - RETURN_IF_ERROR(_filter_block_internal(block, columns_to_filter, filter)); - } - Block::erase_useless_column(block, column_to_keep); - return Status::OK(); -} - Status RowGroupReader::_filter_block(Block* block, int column_to_keep, const std::vector<uint32_t>& columns_to_filter) { if (_pos_delete_filter_ptr) { @@ -599,6 +667,233 @@ Status RowGroupReader::_filter_block_internal(Block* block, return Status::OK(); } +Status RowGroupReader::_rewrite_dict_predicates() { + for (vector<std::string>::iterator it = _dict_filter_col_names.begin(); + it != _dict_filter_col_names.end();) { + std::string& dict_filter_col_name = *it; + int slot_id = _col_name_to_slot_id->at(dict_filter_col_name); + // 1. Get dictionary values to a string column. + MutableColumnPtr dict_value_column = ColumnString::create(); + bool has_dict = false; + RETURN_IF_ERROR(_column_readers[dict_filter_col_name]->read_dict_values_to_column( + dict_value_column, &has_dict)); + size_t dict_value_column_size = dict_value_column->size(); + DCHECK(has_dict); + // 2. Build a temp block from the dict string column, then execute conjuncts and filter block. + // 2.1 Build a temp block from the dict string column to match the conjuncts executing. + Block temp_block; + int dict_pos = -1; + int index = 0; + for (const auto slot_desc : _tuple_descriptor->slots()) { + if (!slot_desc->need_materialize()) { + // should be ignored from reading + continue; + } + if (slot_desc->id() == slot_id) { + auto data_type = slot_desc->get_data_type_ptr(); + if (data_type->is_nullable()) { + temp_block.insert( + {ColumnNullable::create(std::move(dict_value_column), + ColumnUInt8::create(dict_value_column_size, 0)), + std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()), + ""}); + } else { + temp_block.insert( + {std::move(dict_value_column), std::make_shared<DataTypeString>(), ""}); + } + dict_pos = index; + + } else { + temp_block.insert(ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(), + slot_desc->get_data_type_ptr(), + slot_desc->col_name())); + } + ++index; + } + + // 2.2 Execute conjuncts and filter block. + const std::vector<VExprContext*>* ctxs = nullptr; + auto iter = _slot_id_to_filter_conjuncts->find(slot_id); + if (iter != _slot_id_to_filter_conjuncts->end()) { + ctxs = &(iter->second); + } else { + std::stringstream msg; + msg << "_slot_id_to_filter_conjuncts: slot_id [" << slot_id << "] not found"; + return Status::NotFound(msg.str()); + } + + std::vector<uint32_t> columns_to_filter(1, dict_pos); + int column_to_keep = temp_block.columns(); + if (dict_pos != 0) { + // VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0 + // The following process may be tricky and time-consuming, but we have no other way. + temp_block.get_by_position(0).column->assume_mutable()->resize(dict_value_column_size); + } + std::vector<IColumn::Filter*> filters; + RETURN_IF_ERROR(_execute_conjuncts_and_filter_block(*ctxs, filters, &temp_block, + columns_to_filter, column_to_keep)); + if (dict_pos != 0) { + // We have to clean the first column to insert right data. + temp_block.get_by_position(0).column->assume_mutable()->clear(); + } + + // Check some conditions. + ColumnPtr& dict_column = temp_block.get_by_position(dict_pos).column; + // If dict_column->size() == 0, can filter this row group. + if (dict_column->size() == 0) { + _is_row_group_filtered = true; + return Status::OK(); + } + + // About Performance: if dict_column size is too large, it will generate a large IN filter. + if (dict_column->size() > MAX_DICT_CODE_PREDICATE_TO_REWRITE) { + for (auto& ctx : (*ctxs)) { + _filter_conjuncts.push_back(ctx); + } + it = _dict_filter_col_names.erase(it); + continue; + } + + // 3. Get dict codes. + std::vector<int32_t> dict_codes; + if (dict_column->is_nullable()) { + const ColumnNullable* nullable_column = + static_cast<const ColumnNullable*>(dict_column.get()); + const ColumnString* nested_column = static_cast<const ColumnString*>( + nullable_column->get_nested_column_ptr().get()); + RETURN_IF_ERROR(_column_readers[dict_filter_col_name]->get_dict_codes( + assert_cast<const ColumnString*>(nested_column), &dict_codes)); + } else { + RETURN_IF_ERROR(_column_readers[dict_filter_col_name]->get_dict_codes( + assert_cast<const ColumnString*>(dict_column.get()), &dict_codes)); + } + + // 4. Rewrite conjuncts. + _rewrite_dict_conjuncts(dict_codes, slot_id); + ++it; + } + return Status::OK(); +} + +Status RowGroupReader::_rewrite_dict_conjuncts(std::vector<int32_t>& dict_codes, int slot_id) { + VExpr* root; + if (dict_codes.size() == 1) { + { + TFunction fn; + TFunctionName fn_name; + fn_name.__set_db_name(""); + fn_name.__set_function_name("eq"); + fn.__set_name(fn_name); + fn.__set_binary_type(TFunctionBinaryType::BUILTIN); + std::vector<TTypeDesc> arg_types; + arg_types.push_back(create_type_desc(PrimitiveType::TYPE_INT)); + arg_types.push_back(create_type_desc(PrimitiveType::TYPE_INT)); + fn.__set_arg_types(arg_types); + fn.__set_ret_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN)); + fn.__set_has_var_args(false); + + TExprNode texpr_node; + texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN)); + texpr_node.__set_node_type(TExprNodeType::BINARY_PRED); + texpr_node.__set_opcode(TExprOpcode::EQ); + texpr_node.__set_vector_opcode(TExprOpcode::EQ); + texpr_node.__set_fn(fn); + texpr_node.__set_child_type(TPrimitiveType::INT); + texpr_node.__set_num_children(2); + root = _obj_pool->add(new VectorizedFnCall(texpr_node)); + } + { + SlotDescriptor* slot = nullptr; + const std::vector<SlotDescriptor*>& slots = _tuple_descriptor->slots(); + for (auto each : slots) { + if (each->id() == slot_id) { + slot = each; + break; + } + } + VExpr* slot_ref_expr = _obj_pool->add(new VSlotRef(slot)); + root->add_child(slot_ref_expr); + } + { + TExprNode texpr_node; + texpr_node.__set_node_type(TExprNodeType::INT_LITERAL); + texpr_node.__set_type(create_type_desc(TYPE_INT)); + TIntLiteral int_literal; + int_literal.__set_value(dict_codes[0]); + texpr_node.__set_int_literal(int_literal); + VExpr* literal_expr = _obj_pool->add(new VLiteral(texpr_node)); + root->add_child(literal_expr); + } + } else { + { + TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN); + TExprNode node; + node.__set_type(type_desc); + node.__set_node_type(TExprNodeType::IN_PRED); + node.in_predicate.__set_is_not_in(false); + node.__set_opcode(TExprOpcode::FILTER_IN); + node.__isset.vector_opcode = true; + node.__set_vector_opcode(TExprOpcode::FILTER_IN); + + root = _obj_pool->add(new vectorized::VDirectInPredicate(node)); + std::shared_ptr<HybridSetBase> hybrid_set(create_set(PrimitiveType::TYPE_INT)); + for (int j = 0; j < dict_codes.size(); ++j) { + hybrid_set->insert(&dict_codes[j]); + } + static_cast<vectorized::VDirectInPredicate*>(root)->set_filter(hybrid_set); + } + { + SlotDescriptor* slot = nullptr; + const std::vector<SlotDescriptor*>& slots = _tuple_descriptor->slots(); + for (auto each : slots) { + if (each->id() == slot_id) { + slot = each; + break; + } + } + VExpr* slot_ref_expr = _obj_pool->add(new VSlotRef(slot)); + root->add_child(slot_ref_expr); + } + } + VExprContext* rewritten_conjunct_ctx = _obj_pool->add(new VExprContext(root)); + RETURN_IF_ERROR(rewritten_conjunct_ctx->prepare(_state, *_row_descriptor)); + RETURN_IF_ERROR(rewritten_conjunct_ctx->open(_state)); + _dict_filter_conjuncts.push_back(rewritten_conjunct_ctx); + _filter_conjuncts.push_back(rewritten_conjunct_ctx); + return Status::OK(); +} + +void RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) { + for (auto& dict_filter_col_name : _dict_filter_col_names) { + size_t pos = block->get_position_by_name(dict_filter_col_name); + ColumnWithTypeAndName& column_with_type_and_name = block->get_by_position(pos); + const ColumnPtr& column = column_with_type_and_name.column; + if (auto* nullable_column = check_and_get_column<ColumnNullable>(*column)) { + const ColumnPtr& nested_column = nullable_column->get_nested_column_ptr(); + const ColumnInt32* dict_column = assert_cast<const ColumnInt32*>(nested_column.get()); + DCHECK(dict_column); + + MutableColumnPtr string_column = + _column_readers[dict_filter_col_name]->convert_dict_column_to_string_column( + dict_column); + + column_with_type_and_name.type = + std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()); + block->replace_by_position( + pos, ColumnNullable::create(std::move(string_column), + nullable_column->get_null_map_column_ptr())); + } else { + const ColumnInt32* dict_column = assert_cast<const ColumnInt32*>(column.get()); + MutableColumnPtr string_column = + _column_readers[dict_filter_col_name]->convert_dict_column_to_string_column( + dict_column); + + column_with_type_and_name.type = std::make_shared<DataTypeString>(); + block->replace_by_position(pos, std::move(string_column)); + } + } +} + ParquetColumnReader::Statistics RowGroupReader::statistics() { ParquetColumnReader::Statistics st; for (auto& reader : _column_readers) { @@ -608,4 +903,86 @@ ParquetColumnReader::Statistics RowGroupReader::statistics() { return st; } +// TODO Performance Optimization +Status RowGroupReader::_execute_conjuncts(const std::vector<VExprContext*>& ctxs, + const std::vector<IColumn::Filter*>& filters, + Block* block, IColumn::Filter* result_filter, + bool* can_filter_all) { + *can_filter_all = false; + auto* __restrict result_filter_data = result_filter->data(); + for (auto* ctx : ctxs) { + int result_column_id = -1; + RETURN_IF_ERROR(ctx->execute(block, &result_column_id)); + ColumnPtr& filter_column = block->get_by_position(result_column_id).column; + if (auto* nullable_column = check_and_get_column<ColumnNullable>(*filter_column)) { + size_t column_size = nullable_column->size(); + if (column_size == 0) { + *can_filter_all = true; + return Status::OK(); + } else { + const ColumnPtr& nested_column = nullable_column->get_nested_column_ptr(); + const IColumn::Filter& filter = + assert_cast<const ColumnUInt8&>(*nested_column).get_data(); + auto* __restrict filter_data = filter.data(); + const size_t size = filter.size(); + auto* __restrict null_map_data = nullable_column->get_null_map_data().data(); + + for (size_t i = 0; i < size; ++i) { + result_filter_data[i] &= (!null_map_data[i]) & filter_data[i]; + } + if (memchr(filter_data, 0x1, size) == nullptr) { + *can_filter_all = true; + return Status::OK(); + } + } + } else if (auto* const_column = check_and_get_column<ColumnConst>(*filter_column)) { + // filter all + if (!const_column->get_bool(0)) { + *can_filter_all = true; + return Status::OK(); + } + } else { + const IColumn::Filter& filter = + assert_cast<const ColumnUInt8&>(*filter_column).get_data(); + auto* __restrict filter_data = filter.data(); + + const size_t size = filter.size(); + for (size_t i = 0; i < size; ++i) { + result_filter_data[i] &= filter_data[i]; + } + + if (memchr(filter_data, 0x1, size) == nullptr) { + *can_filter_all = true; + return Status::OK(); + } + } + } + for (auto* filter : filters) { + auto* __restrict filter_data = filter->data(); + const size_t size = filter->size(); + for (size_t i = 0; i < size; ++i) { + result_filter_data[i] &= filter_data[i]; + } + } + return Status::OK(); +} + +// TODO Performance Optimization +Status RowGroupReader::_execute_conjuncts_and_filter_block( + const std::vector<VExprContext*>& ctxs, const std::vector<IColumn::Filter*>& filters, + Block* block, std::vector<uint32_t>& columns_to_filter, int column_to_keep) { + IColumn::Filter result_filter(block->rows(), 1); + bool can_filter_all; + RETURN_IF_ERROR(_execute_conjuncts(ctxs, filters, block, &result_filter, &can_filter_all)); + if (can_filter_all) { + for (auto& col : columns_to_filter) { + std::move(*block->get_by_position(col).column).assume_mutable()->clear(); + } + } else { + RETURN_IF_ERROR(_filter_block_internal(block, columns_to_filter, result_filter)); + } + Block::erase_useless_column(block, column_to_keep); + return Status::OK(); +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h index cd1f92a909..99cb6cb85a 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h @@ -25,6 +25,9 @@ namespace doris::vectorized { +// TODO: we need to determine it by test. +static constexpr uint32_t MAX_DICT_CODE_PREDICATE_TO_REWRITE = std::numeric_limits<uint32_t>::max(); + class RowGroupReader { public: static const std::vector<int64_t> NO_DELETE; @@ -103,11 +106,16 @@ public: const std::vector<ParquetReadColumn>& read_columns, const int32_t row_group_id, const tparquet::RowGroup& row_group, cctz::time_zone* ctz, const PositionDeleteContext& position_delete_ctx, - const LazyReadContext& lazy_read_ctx); + const LazyReadContext& lazy_read_ctx, RuntimeState* state); ~RowGroupReader(); - Status init(const FieldDescriptor& schema, std::vector<RowRange>& row_ranges, - std::unordered_map<int, tparquet::OffsetIndex>& col_offsets); + Status init( + const FieldDescriptor& schema, std::vector<RowRange>& row_ranges, + std::unordered_map<int, tparquet::OffsetIndex>& col_offsets, + const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, + const std::unordered_map<std::string, int>* colname_to_slot_id, + const std::vector<VExprContext*>* not_single_slot_filter_conjuncts, + const std::unordered_map<int, std::vector<VExprContext*>>* slot_id_to_filter_conjuncts); Status next_batch(Block* block, size_t batch_size, size_t* read_rows, bool* batch_eof); int64_t lazy_read_filtered_rows() const { return _lazy_read_filtered_rows; } @@ -120,9 +128,8 @@ private: size_t batch_size, size_t* read_rows, bool* batch_eof, ColumnSelectVector& select_vector); Status _do_lazy_read(Block* block, size_t batch_size, size_t* read_rows, bool* batch_eof); - const uint8_t* _build_filter_map(ColumnPtr& sv, size_t num_rows, bool* can_filter_all); void _rebuild_select_vector(ColumnSelectVector& select_vector, - std::unique_ptr<uint8_t[]>& filter_map, size_t pre_read_rows); + std::unique_ptr<uint8_t[]>& filter_map, size_t pre_read_rows) const; Status _fill_partition_columns( Block* block, size_t rows, const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>& @@ -131,13 +138,26 @@ private: Block* block, size_t rows, const std::unordered_map<std::string, VExprContext*>& missing_columns); Status _build_pos_delete_filter(size_t read_rows); - Status _filter_block(Block* block, const ColumnPtr& filter_column, int column_to_keep, - std::vector<uint32_t> columns_to_filter); Status _filter_block(Block* block, int column_to_keep, const vector<uint32_t>& columns_to_filter); Status _filter_block_internal(Block* block, const vector<uint32_t>& columns_to_filter, const IColumn::Filter& filter); + bool _can_filter_by_dict(const string& predicate_col_name, + const tparquet::ColumnMetaData& column_metadata); + bool is_dictionary_encoded(const tparquet::ColumnMetaData& column_metadata); + Status _rewrite_dict_predicates(); + Status _rewrite_dict_conjuncts(std::vector<int32_t>& dict_codes, int slot_id); + void _convert_dict_cols_to_string_cols(Block* block); + Status _execute_conjuncts(const std::vector<VExprContext*>& ctxs, + const std::vector<IColumn::Filter*>& filters, Block* block, + IColumn::Filter* result_filter, bool* can_filter_all); + Status _execute_conjuncts_and_filter_block(const std::vector<VExprContext*>& ctxs, + const std::vector<IColumn::Filter*>& filters, + Block* block, + std::vector<uint32_t>& columns_to_filter, + int column_to_keep); + io::FileReaderSPtr _file_reader; std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> _column_readers; const std::vector<ParquetReadColumn>& _read_columns; @@ -156,5 +176,15 @@ private: std::unique_ptr<TextConverter> _text_converter = nullptr; std::unique_ptr<IColumn::Filter> _pos_delete_filter_ptr = nullptr; int64_t _total_read_rows = 0; + const TupleDescriptor* _tuple_descriptor; + const RowDescriptor* _row_descriptor; + const std::unordered_map<std::string, int>* _col_name_to_slot_id; + const std::unordered_map<int, std::vector<VExprContext*>>* _slot_id_to_filter_conjuncts; + std::vector<VExprContext*> _dict_filter_conjuncts; + std::vector<VExprContext*> _filter_conjuncts; + std::vector<std::string> _dict_filter_col_names; + RuntimeState* _state; + std::shared_ptr<ObjectPool> _obj_pool; + bool _is_row_group_filtered = false; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index c61f60ad89..7c68f1925a 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -34,7 +34,7 @@ namespace doris::vectorized { ParquetReader::ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params, const TFileRangeDesc& range, size_t batch_size, cctz::time_zone* ctz, - IOContext* io_ctx) + IOContext* io_ctx, RuntimeState* state) : _profile(profile), _scan_params(params), _scan_range(range), @@ -42,15 +42,20 @@ ParquetReader::ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams _range_start_offset(range.start_offset), _range_size(range.size), _ctz(ctz), - _io_ctx(io_ctx) { + _io_ctx(io_ctx), + _state(state) { _init_profile(); _init_system_properties(); _init_file_description(); } ParquetReader::ParquetReader(const TFileScanRangeParams& params, const TFileRangeDesc& range, - IOContext* io_ctx) - : _profile(nullptr), _scan_params(params), _scan_range(range), _io_ctx(io_ctx) { + IOContext* io_ctx, RuntimeState* state) + : _profile(nullptr), + _scan_params(params), + _scan_range(range), + _io_ctx(io_ctx), + _state(state) { _init_system_properties(); _init_file_description(); } @@ -195,7 +200,17 @@ Status ParquetReader::init_reader( const std::vector<std::string>& all_column_names, const std::vector<std::string>& missing_column_names, std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range, - VExprContext* vconjunct_ctx, bool filter_groups) { + VExprContext* vconjunct_ctx, const TupleDescriptor* tuple_descriptor, + const RowDescriptor* row_descriptor, + const std::unordered_map<std::string, int>* colname_to_slot_id, + const std::vector<VExprContext*>* not_single_slot_filter_conjuncts, + const std::unordered_map<int, std::vector<VExprContext*>>* slot_id_to_filter_conjuncts, + bool filter_groups) { + _tuple_descriptor = tuple_descriptor; + _row_descriptor = row_descriptor; + _colname_to_slot_id = colname_to_slot_id; + _not_single_slot_filter_conjuncts = not_single_slot_filter_conjuncts; + _slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts; if (_file_metadata == nullptr) { return Status::InternalError("failed to init parquet reader, please open reader first"); } @@ -467,10 +482,12 @@ Status ParquetReader::_next_row_group_reader() { _get_position_delete_ctx(row_group, row_group_index); _current_group_reader.reset(new RowGroupReader(_file_reader, _read_columns, row_group_index.row_group_id, row_group, _ctz, - position_delete_ctx, _lazy_read_ctx)); + position_delete_ctx, _lazy_read_ctx, _state)); _row_group_eof = false; - return _current_group_reader->init(_file_metadata->schema(), candidate_row_ranges, - _col_offsets); + return _current_group_reader->init(_file_metadata->schema(), candidate_row_ranges, _col_offsets, + _tuple_descriptor, _row_descriptor, _colname_to_slot_id, + _not_single_slot_filter_conjuncts, + _slot_id_to_filter_conjuncts); } Status ParquetReader::_init_row_groups(const bool& is_filter_groups) { diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index 10e04bdabb..f21996a1df 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -58,10 +58,10 @@ public: ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params, const TFileRangeDesc& range, size_t batch_size, cctz::time_zone* ctz, - IOContext* io_ctx); + IOContext* io_ctx, RuntimeState* state); ParquetReader(const TFileScanRangeParams& params, const TFileRangeDesc& range, - IOContext* io_ctx); + IOContext* io_ctx, RuntimeState* state); ~ParquetReader() override; // for test @@ -73,7 +73,12 @@ public: const std::vector<std::string>& all_column_names, const std::vector<std::string>& missing_column_names, std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range, - VExprContext* vconjunct_ctx, bool filter_groups = true); + VExprContext* vconjunct_ctx, const TupleDescriptor* tuple_descriptor, + const RowDescriptor* row_descriptor, + const std::unordered_map<std::string, int>* colname_to_slot_id, + const std::vector<VExprContext*>* not_single_slot_filter_conjuncts, + const std::unordered_map<int, std::vector<VExprContext*>>* slot_id_to_filter_conjuncts, + bool filter_groups = true); Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; @@ -204,5 +209,11 @@ private: ParquetProfile _parquet_profile; bool _closed = false; IOContext* _io_ctx; + RuntimeState* _state; + const TupleDescriptor* _tuple_descriptor; + const RowDescriptor* _row_descriptor; + const std::unordered_map<std::string, int>* _colname_to_slot_id; + const std::vector<VExprContext*>* _not_single_slot_filter_conjuncts; + const std::unordered_map<int, std::vector<VExprContext*>>* _slot_id_to_filter_conjuncts; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index ce36da6a92..353b3bb198 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -63,7 +63,11 @@ Status IcebergTableReader::init_reader( const std::vector<std::string>& file_col_names, const std::unordered_map<int, std::string>& col_id_name_map, std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range, - VExprContext* vconjunct_ctx) { + VExprContext* vconjunct_ctx, const TupleDescriptor* tuple_descriptor, + const RowDescriptor* row_descriptor, + const std::unordered_map<std::string, int>* colname_to_slot_id, + const std::vector<VExprContext*>* not_single_slot_filter_conjuncts, + const std::unordered_map<int, std::vector<VExprContext*>>* slot_id_to_filter_conjuncts) { ParquetReader* parquet_reader = static_cast<ParquetReader*>(_file_format_reader.get()); _col_id_name_map = col_id_name_map; _file_col_names = file_col_names; @@ -73,8 +77,10 @@ Status IcebergTableReader::init_reader( _gen_file_col_names(); _gen_new_colname_to_value_range(); parquet_reader->set_table_to_file_col_map(_table_col_to_file_col); - Status status = parquet_reader->init_reader(_all_required_col_names, _not_in_file_col_names, - &_new_colname_to_value_range, vconjunct_ctx); + Status status = parquet_reader->init_reader( + _all_required_col_names, _not_in_file_col_names, &_new_colname_to_value_range, + vconjunct_ctx, tuple_descriptor, row_descriptor, colname_to_slot_id, + not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts); return status; } @@ -181,7 +187,7 @@ Status IcebergTableReader::_position_delete( delete_range.file_size = -1; ParquetReader delete_reader(_profile, _params, delete_range, 102400, const_cast<cctz::time_zone*>(&_state->timezone_obj()), - _io_ctx); + _io_ctx, _state); if (!init_schema) { delete_reader.get_parsed_schema(&delete_file_col_names, &delete_file_col_types); init_schema = true; @@ -191,6 +197,7 @@ Status IcebergTableReader::_position_delete( return nullptr; } create_status = delete_reader.init_reader(delete_file_col_names, _not_in_file_col_names, + nullptr, nullptr, nullptr, nullptr, nullptr, nullptr, nullptr, false); if (!create_status.ok()) { return nullptr; diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h index 0f3343e3ca..3e57eda86b 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.h +++ b/be/src/vec/exec/format/table/iceberg_reader.h @@ -63,7 +63,11 @@ public: const std::vector<std::string>& file_col_names, const std::unordered_map<int, std::string>& col_id_name_map, std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range, - VExprContext* vconjunct_ctx); + VExprContext* vconjunct_ctx, const TupleDescriptor* tuple_descriptor, + const RowDescriptor* row_descriptor, + const std::unordered_map<std::string, int>* colname_to_slot_id, + const std::vector<VExprContext*>* not_single_slot_filter_conjuncts, + const std::unordered_map<int, std::vector<VExprContext*>>* slot_id_to_filter_conjuncts); enum { DATA, POSITION_DELETE, EQUALITY_DELETE }; diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp b/be/src/vec/exec/scan/new_file_scan_node.cpp index 50b54f1691..f6b3c98573 100644 --- a/be/src/vec/exec/scan/new_file_scan_node.cpp +++ b/be/src/vec/exec/scan/new_file_scan_node.cpp @@ -93,7 +93,8 @@ Status NewFileScanNode::_init_scanners(std::list<VScanner*>* scanners) { runtime_profile(), _kv_cache); _scanner_pool.add(scanner); RETURN_IF_ERROR(((VFileScanner*)scanner) - ->prepare(_vconjunct_ctx_ptr.get(), &_colname_to_value_range)); + ->prepare(_vconjunct_ctx_ptr.get(), &_colname_to_value_range, + &_colname_to_slot_id)); scanners->push_back(scanner); } diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index fbc49940bc..aaa0ec9f36 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -36,6 +36,7 @@ #include "vec/exec/format/parquet/vparquet_reader.h" #include "vec/exec/format/table/iceberg_reader.h" #include "vec/exec/scan/new_file_scan_node.h" +#include "vec/exprs/vslot_ref.h" #include "vec/functions/simple_function_factory.h" namespace doris::vectorized { @@ -59,9 +60,11 @@ VFileScanner::VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t Status VFileScanner::prepare( VExprContext** vconjunct_ctx_ptr, - std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) { + std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range, + const std::unordered_map<std::string, int>* colname_to_slot_id) { RETURN_IF_ERROR(VScanner::prepare(_state, vconjunct_ctx_ptr)); _colname_to_value_range = colname_to_value_range; + _col_name_to_slot_id = colname_to_slot_id; _get_block_timer = ADD_TIMER(_parent->_scanner_profile, "FileScannerGetBlockTime"); _cast_to_input_block_timer = @@ -101,6 +104,59 @@ Status VFileScanner::prepare( return Status::OK(); } +Status VFileScanner::_split_conjuncts(VExpr* conjunct_expr_root) { + static constexpr auto is_leaf = [](VExpr* expr) { return !expr->is_and_expr(); }; + if (conjunct_expr_root != nullptr) { + if (is_leaf(conjunct_expr_root)) { + auto impl = conjunct_expr_root->get_impl(); + // If impl is not null, which means this a conjuncts from runtime filter. + VExpr* cur_expr = impl ? const_cast<VExpr*>(impl) : conjunct_expr_root; + VExprContext* new_ctx = _state->obj_pool()->add(new VExprContext(cur_expr)); + _vconjunct_ctx->clone_fn_contexts(new_ctx); + RETURN_IF_ERROR(new_ctx->prepare(_state, *_default_val_row_desc)); + RETURN_IF_ERROR(new_ctx->open(_state)); + + std::vector<int> slot_ids; + _get_slot_ids(cur_expr, &slot_ids); + if (slot_ids.size() == 0) { + _not_single_slot_filter_conjuncts.emplace_back(new_ctx); + return Status::OK(); + } + bool single_slot = true; + for (int i = 1; i < slot_ids.size(); i++) { + if (slot_ids[i] != slot_ids[0]) { + single_slot = false; + break; + } + } + if (single_slot) { + SlotId slot_id = slot_ids[0]; + if (_slot_id_to_filter_conjuncts.find(slot_id) == + _slot_id_to_filter_conjuncts.end()) { + _slot_id_to_filter_conjuncts.insert({slot_id, std::vector<VExprContext*>()}); + } + _slot_id_to_filter_conjuncts[slot_id].emplace_back(new_ctx); + } else { + _not_single_slot_filter_conjuncts.emplace_back(new_ctx); + } + } else { + RETURN_IF_ERROR(_split_conjuncts(conjunct_expr_root->children()[0])); + RETURN_IF_ERROR(_split_conjuncts(conjunct_expr_root->children()[1])); + } + } + return Status::OK(); +} + +void VFileScanner::_get_slot_ids(VExpr* expr, std::vector<int>* slot_ids) { + for (VExpr* child_expr : expr->children()) { + if (child_expr->is_slot_ref()) { + VSlotRef* slot_ref = reinterpret_cast<VSlotRef*>(child_expr); + slot_ids->emplace_back(slot_ref->slot_id()); + } + _get_slot_ids(child_expr, slot_ids); + } +} + Status VFileScanner::open(RuntimeState* state) { RETURN_IF_ERROR(VScanner::open(state)); RETURN_IF_ERROR(_init_expr_ctxes()); @@ -481,7 +537,7 @@ Status VFileScanner::_get_next_reader() { case TFileFormatType::FORMAT_PARQUET: { ParquetReader* parquet_reader = new ParquetReader( _profile, _params, range, _state->query_options().batch_size, - const_cast<cctz::time_zone*>(&_state->timezone_obj()), _io_ctx.get()); + const_cast<cctz::time_zone*>(&_state->timezone_obj()), _io_ctx.get(), _state); RETURN_IF_ERROR(parquet_reader->open()); if (!_is_load && _push_down_expr == nullptr && _vconjunct_ctx != nullptr) { RETURN_IF_ERROR(_vconjunct_ctx->clone(_state, &_push_down_expr)); @@ -492,14 +548,18 @@ Status VFileScanner::_get_next_reader() { IcebergTableReader* iceberg_reader = new IcebergTableReader((GenericReader*)parquet_reader, _profile, _state, _params, range, _kv_cache, _io_ctx.get()); - init_status = iceberg_reader->init_reader(_file_col_names, _col_id_name_map, - _colname_to_value_range, _push_down_expr); + init_status = iceberg_reader->init_reader( + _file_col_names, _col_id_name_map, _colname_to_value_range, _push_down_expr, + _real_tuple_desc, _default_val_row_desc.get(), _col_name_to_slot_id, + &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts); RETURN_IF_ERROR(iceberg_reader->init_row_filters(range)); _cur_reader.reset((GenericReader*)iceberg_reader); } else { std::vector<std::string> place_holder; - init_status = parquet_reader->init_reader(_file_col_names, place_holder, - _colname_to_value_range, _push_down_expr); + init_status = parquet_reader->init_reader( + _file_col_names, place_holder, _colname_to_value_range, _push_down_expr, + _real_tuple_desc, _default_val_row_desc.get(), _col_name_to_slot_id, + &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts); _cur_reader.reset((GenericReader*)parquet_reader); } break; @@ -749,6 +809,20 @@ Status VFileScanner::close(RuntimeState* state) { _push_down_expr->close(state); } + for (auto& [k, v] : _slot_id_to_filter_conjuncts) { + for (auto& ctx : v) { + if (ctx != nullptr) { + ctx->close(state); + } + } + } + + for (auto* ctx : _not_single_slot_filter_conjuncts) { + if (ctx != nullptr) { + ctx->close(state); + } + } + if (config::enable_file_cache) { io::FileCacheProfileReporter cache_profile(_profile); cache_profile.update(_file_cache_statistics.get()); diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h index 4e503da5ef..410d357d18 100644 --- a/be/src/vec/exec/scan/vfile_scanner.h +++ b/be/src/vec/exec/scan/vfile_scanner.h @@ -42,7 +42,8 @@ public: public: Status prepare(VExprContext** vconjunct_ctx_ptr, - std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range); + std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range, + const std::unordered_map<std::string, int>* colname_to_slot_id); protected: Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) override; @@ -106,6 +107,7 @@ protected: int _rows = 0; int _num_of_columns_from_file; + bool _src_block_mem_reuse = false; bool _strict_mode; bool _src_block_init = false; @@ -114,6 +116,8 @@ protected: VExprContext* _push_down_expr = nullptr; bool _is_dynamic_schema = false; + // for tracing dynamic schema + std::unique_ptr<vectorized::schema_util::FullBaseSchemaView> _full_base_schema_view; std::unique_ptr<FileCacheStatistics> _file_cache_statistics; std::unique_ptr<IOContext> _io_ctx; @@ -126,6 +130,12 @@ private: RuntimeProfile::Counter* _pre_filter_timer = nullptr; RuntimeProfile::Counter* _convert_to_output_block_timer = nullptr; + const std::unordered_map<std::string, int>* _col_name_to_slot_id; + // single slot filter conjuncts + std::unordered_map<int, std::vector<VExprContext*>> _slot_id_to_filter_conjuncts; + // not single(zero or multi) slot filter conjuncts + std::vector<VExprContext*> _not_single_slot_filter_conjuncts; + private: Status _init_expr_ctxes(); Status _init_src_block(Block* block); @@ -135,6 +145,9 @@ private: Status _pre_filter_src_block(); Status _convert_to_output_block(Block* block); Status _generate_fill_columns(); + Status _handle_dynamic_block(Block* block); + Status _split_conjuncts(VExpr* conjunct_expr_root); + void _get_slot_ids(VExpr* expr, std::vector<int>* slot_ids); void _reset_counter() { _counter.num_rows_unselected = 0; diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index f019df4b60..f8fa213d33 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -447,6 +447,8 @@ Status VScanNode::_normalize_conjuncts() { std::vector<SlotDescriptor*> slots = _output_tuple_desc->slots(); for (int slot_idx = 0; slot_idx < slots.size(); ++slot_idx) { + _colname_to_slot_id[slots[slot_idx]->col_name()] = slots[slot_idx]->id(); + auto type = slots[slot_idx]->type().type; if (slots[slot_idx]->type().type == TYPE_ARRAY) { type = slots[slot_idx]->type().children[0].type; diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index 51dc2edb1b..de641e6055 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -311,6 +311,8 @@ protected: RuntimeProfile::HighWaterMarkCounter* _queued_blocks_memory_usage; RuntimeProfile::HighWaterMarkCounter* _free_blocks_memory_usage; + std::unordered_map<std::string, int> _colname_to_slot_id; + private: // Register and get all runtime filters at Init phase. Status _register_runtime_filter(); diff --git a/be/test/vec/exec/parquet/parquet_reader_test.cpp b/be/test/vec/exec/parquet/parquet_reader_test.cpp index 42ff7cf538..4ca7e55ecd 100644 --- a/be/test/vec/exec/parquet/parquet_reader_test.cpp +++ b/be/test/vec/exec/parquet/parquet_reader_test.cpp @@ -108,7 +108,8 @@ TEST_F(ParquetReaderTest, normal) { scan_range.start_offset = 0; scan_range.size = 1000; } - auto p_reader = new ParquetReader(nullptr, scan_params, scan_range, 992, &ctz, nullptr); + auto p_reader = + new ParquetReader(nullptr, scan_params, scan_range, 992, &ctz, nullptr, nullptr); p_reader->set_file_reader(reader); RuntimeState runtime_state((TQueryGlobals())); runtime_state.set_desc_tbl(desc_tbl); @@ -116,7 +117,8 @@ TEST_F(ParquetReaderTest, normal) { std::unordered_map<std::string, ColumnValueRangeType> colname_to_value_range; p_reader->open(); - p_reader->init_reader(column_names, missing_column_names, nullptr, nullptr); + p_reader->init_reader(column_names, missing_column_names, nullptr, nullptr, nullptr, nullptr, + nullptr, nullptr, nullptr); std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>> partition_columns; std::unordered_map<std::string, VExprContext*> missing_columns; diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp b/be/test/vec/exec/parquet/parquet_thrift_test.cpp index c7e4894d20..9426505489 100644 --- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp +++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp @@ -191,7 +191,7 @@ static Status get_column_values(io::FileReaderSPtr file_reader, tparquet::Column // required column std::vector<u_short> null_map = {(u_short)rows}; run_length_map.set_run_length_null_map(null_map, rows, nullptr); - return chunk_reader.decode_values(data_column, data_type, run_length_map); + return chunk_reader.decode_values(data_column, data_type, run_length_map, false); } else { // column with null values level_t level_type = definitions[0]; @@ -204,8 +204,8 @@ static Status get_column_values(io::FileReaderSPtr file_reader, tparquet::Column } else { std::vector<u_short> null_map = {(u_short)num_values}; run_length_map.set_run_length_null_map(null_map, rows, nullptr); - RETURN_IF_ERROR( - chunk_reader.decode_values(data_column, data_type, run_length_map)); + RETURN_IF_ERROR(chunk_reader.decode_values(data_column, data_type, + run_length_map, false)); } level_type = definitions[i]; num_values = 1; @@ -219,7 +219,8 @@ static Status get_column_values(io::FileReaderSPtr file_reader, tparquet::Column } else { std::vector<u_short> null_map = {(u_short)num_values}; run_length_map.set_run_length_null_map(null_map, rows, nullptr); - RETURN_IF_ERROR(chunk_reader.decode_values(data_column, data_type, run_length_map)); + RETURN_IF_ERROR( + chunk_reader.decode_values(data_column, data_type, run_length_map, false)); } return Status::OK(); } @@ -421,12 +422,13 @@ TEST_F(ParquetThriftReaderTest, group_reader) { std::shared_ptr<RowGroupReader> row_group_reader; RowGroupReader::PositionDeleteContext position_delete_ctx(row_group.num_rows, 0); row_group_reader.reset(new RowGroupReader(file_reader, read_columns, 0, row_group, &ctz, - position_delete_ctx, lazy_read_ctx)); + position_delete_ctx, lazy_read_ctx, nullptr)); std::vector<RowRange> row_ranges; row_ranges.emplace_back(0, row_group.num_rows); auto col_offsets = std::unordered_map<int, tparquet::OffsetIndex>(); - auto stg = row_group_reader->init(meta_data->schema(), row_ranges, col_offsets); + auto stg = row_group_reader->init(meta_data->schema(), row_ranges, col_offsets, nullptr, + nullptr, nullptr, nullptr, nullptr); EXPECT_TRUE(stg.ok()); vectorized::Block block; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org