This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 8df93f8dfec [Opt](parquet/orc-reader) Opt get dict ids in _rewrite_dict_predicates() (#40095) 8df93f8dfec is described below commit 8df93f8dfec1e5a0a697be2147e22913079a4701 Author: Qi Chen <kaka11.c...@gmail.com> AuthorDate: Thu Aug 29 14:50:42 2024 +0800 [Opt](parquet/orc-reader) Opt get dict ids in _rewrite_dict_predicates() (#40095) ## Proposed changes backport #39893. --- be/src/vec/exec/format/orc/vorc_reader.cpp | 49 ++++++++-------------- .../format/parquet/byte_array_dict_decoder.cpp | 11 ----- .../exec/format/parquet/byte_array_dict_decoder.h | 4 -- be/src/vec/exec/format/parquet/decoder.h | 5 --- .../format/parquet/fix_length_dict_decoder.hpp | 14 ------- .../format/parquet/vparquet_column_chunk_reader.h | 5 --- .../exec/format/parquet/vparquet_column_reader.cpp | 5 --- .../exec/format/parquet/vparquet_column_reader.h | 7 ---- .../exec/format/parquet/vparquet_group_reader.cpp | 43 ++++++++----------- 9 files changed, 36 insertions(+), 107 deletions(-) diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 4bc52d76959..d67602d39f6 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -2054,7 +2054,6 @@ Status OrcReader::on_string_dicts_loaded( orc::StringDictionary* dict = file_column_name_to_dict_map_iter->second; std::vector<StringRef> dict_values; - std::unordered_map<StringRef, int64_t> dict_value_to_code; size_t max_value_length = 0; uint64_t dictionaryCount = dict->dictionaryOffset.size() - 1; if (dictionaryCount == 0) { @@ -2074,7 +2073,6 @@ Status OrcReader::on_string_dicts_loaded( max_value_length = length; } dict_values.emplace_back(dict_value); - dict_value_to_code[dict_value] = i; } dict_value_column->insert_many_strings_overflow(&dict_values[0], dict_values.size(), max_value_length); @@ -2113,31 +2111,37 @@ Status OrcReader::on_string_dicts_loaded( ++index; } - // 2.2 Execute conjuncts and filter block. - std::vector<uint32_t> columns_to_filter(1, dict_pos); - int column_to_keep = temp_block.columns(); + // 2.2 Execute conjuncts. if (dict_pos != 0) { // VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0 // The following process may be tricky and time-consuming, but we have no other way. temp_block.get_by_position(0).column->assume_mutable()->resize(dict_value_column_size); } - RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block( - ctxs, nullptr, &temp_block, columns_to_filter, column_to_keep))); + IColumn::Filter result_filter(temp_block.rows(), 1); + bool can_filter_all; + RETURN_IF_ERROR(VExprContext::execute_conjuncts(ctxs, nullptr, &temp_block, &result_filter, + &can_filter_all)); if (dict_pos != 0) { // We have to clean the first column to insert right data. temp_block.get_by_position(0).column->assume_mutable()->clear(); } - // Check some conditions. - ColumnPtr& dict_column = temp_block.get_by_position(dict_pos).column; - // If dict_column->size() == 0, can filter this stripe. - if (dict_column->size() == 0) { + // If can_filter_all = true, can filter this stripe. + if (can_filter_all) { *is_stripe_filtered = true; return Status::OK(); } + // 3. Get dict codes. + std::vector<int32_t> dict_codes; + for (size_t i = 0; i < result_filter.size(); ++i) { + if (result_filter[i]) { + dict_codes.emplace_back(i); + } + } + // About Performance: if dict_column size is too large, it will generate a large IN filter. - if (dict_column->size() > MAX_DICT_CODE_PREDICATE_TO_REWRITE) { + if (dict_codes.size() > MAX_DICT_CODE_PREDICATE_TO_REWRITE) { it = _dict_filter_cols.erase(it); for (auto& ctx : ctxs) { _non_dict_filter_conjuncts.emplace_back(ctx); @@ -2145,26 +2149,9 @@ Status OrcReader::on_string_dicts_loaded( continue; } - // 3. Get dict codes. - std::vector<int32_t> dict_codes; - if (dict_column->is_nullable()) { - const ColumnNullable* nullable_column = - static_cast<const ColumnNullable*>(dict_column.get()); - const ColumnString* nested_column = static_cast<const ColumnString*>( - nullable_column->get_nested_column_ptr().get()); - for (int i = 0; i < nested_column->size(); ++i) { - StringRef dict_value = nested_column->get_data_at(i); - dict_codes.emplace_back(dict_value_to_code[dict_value]); - } - } else { - for (int i = 0; i < dict_column->size(); ++i) { - StringRef dict_value = dict_column->get_data_at(i); - dict_codes.emplace_back(dict_value_to_code[dict_value]); - } - } - // 4. Rewrite conjuncts. - RETURN_IF_ERROR(_rewrite_dict_conjuncts(dict_codes, slot_id, dict_column->is_nullable())); + RETURN_IF_ERROR(_rewrite_dict_conjuncts( + dict_codes, slot_id, temp_block.get_by_position(dict_pos).column->is_nullable())); ++it; } return Status::OK(); diff --git a/be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp b/be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp index b6a614831a3..82bb234fd39 100644 --- a/be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp +++ b/be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp @@ -44,7 +44,6 @@ Status ByteArrayDictDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t total_length += l; } - _dict_value_to_code.reserve(num_values); // For insert_many_strings_overflow _dict_data.resize(total_length + ColumnString::MAX_STRINGS_OVERFLOW_SIZE); _max_value_length = 0; @@ -55,7 +54,6 @@ Status ByteArrayDictDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t offset_cursor += 4; memcpy(&_dict_data[offset], dict_item_address + offset_cursor, l); _dict_items.emplace_back(&_dict_data[offset], l); - _dict_value_to_code[StringRef(&_dict_data[offset], l)] = i; offset_cursor += l; offset += l; if (offset_cursor > length) { @@ -77,15 +75,6 @@ Status ByteArrayDictDecoder::read_dict_values_to_column(MutableColumnPtr& doris_ return Status::OK(); } -Status ByteArrayDictDecoder::get_dict_codes(const ColumnString* string_column, - std::vector<int32_t>* dict_codes) { - for (int i = 0; i < string_column->size(); ++i) { - StringRef dict_value = string_column->get_data_at(i); - dict_codes->emplace_back(_dict_value_to_code[dict_value]); - } - return Status::OK(); -} - MutableColumnPtr ByteArrayDictDecoder::convert_dict_column_to_string_column( const ColumnInt32* dict_column) { auto res = ColumnString::create(); diff --git a/be/src/vec/exec/format/parquet/byte_array_dict_decoder.h b/be/src/vec/exec/format/parquet/byte_array_dict_decoder.h index 744a62165fb..bb83d41813b 100644 --- a/be/src/vec/exec/format/parquet/byte_array_dict_decoder.h +++ b/be/src/vec/exec/format/parquet/byte_array_dict_decoder.h @@ -54,9 +54,6 @@ public: Status read_dict_values_to_column(MutableColumnPtr& doris_column) override; - Status get_dict_codes(const ColumnString* column_string, - std::vector<int32_t>* dict_codes) override; - MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32* dict_column) override; protected: @@ -64,6 +61,5 @@ protected: std::vector<StringRef> _dict_items; std::vector<uint8_t> _dict_data; size_t _max_value_length; - std::unordered_map<StringRef, int32_t> _dict_value_to_code; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/decoder.h b/be/src/vec/exec/format/parquet/decoder.h index 57fecf4abfb..1654878af80 100644 --- a/be/src/vec/exec/format/parquet/decoder.h +++ b/be/src/vec/exec/format/parquet/decoder.h @@ -78,11 +78,6 @@ public: return Status::NotSupported("read_dict_values_to_column is not supported"); } - virtual Status get_dict_codes(const ColumnString* column_string, - std::vector<int32_t>* dict_codes) { - return Status::NotSupported("get_dict_codes is not supported"); - } - virtual MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32* dict_column) { LOG(FATAL) << "Method convert_dict_column_to_string_column is not supported"; __builtin_unreachable(); diff --git a/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp b/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp index e409c664d3e..2886696877f 100644 --- a/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp +++ b/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp @@ -109,10 +109,8 @@ protected: _dict = std::move(dict); char* dict_item_address = reinterpret_cast<char*>(_dict.get()); _dict_items.resize(num_values); - _dict_value_to_code.reserve(num_values); for (size_t i = 0; i < num_values; ++i) { _dict_items[i] = dict_item_address; - _dict_value_to_code[StringRef(_dict_items[i], _type_length)] = i; dict_item_address += _type_length; } return Status::OK(); @@ -128,17 +126,6 @@ protected: return Status::OK(); } - Status get_dict_codes(const ColumnString* string_column, - std::vector<int32_t>* dict_codes) override { - size_t size = string_column->size(); - dict_codes->reserve(size); - for (int i = 0; i < size; ++i) { - StringRef dict_value = string_column->get_data_at(i); - dict_codes->emplace_back(_dict_value_to_code[dict_value]); - } - return Status::OK(); - } - MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32* dict_column) override { auto res = ColumnString::create(); std::vector<StringRef> dict_values(dict_column->size()); @@ -149,7 +136,6 @@ protected: res->insert_many_strings(&dict_values[0], dict_values.size()); return res; } - std::unordered_map<StringRef, int32_t> _dict_value_to_code; // For dictionary encoding std::vector<char*> _dict_items; }; diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h index 79ee3cd6463..a00a4683725 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h @@ -183,11 +183,6 @@ public: ->read_dict_values_to_column(doris_column); } - Status get_dict_codes(const ColumnString* column_string, std::vector<int32_t>* dict_codes) { - return _decoders[static_cast<int>(tparquet::Encoding::RLE_DICTIONARY)]->get_dict_codes( - column_string, dict_codes); - } - MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32* dict_column) { return _decoders[static_cast<int>(tparquet::Encoding::RLE_DICTIONARY)] ->convert_dict_column_to_string_column(dict_column); diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp index c31c63ee87c..9c368b6a7a6 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp @@ -454,11 +454,6 @@ Status ScalarColumnReader::read_dict_values_to_column(MutableColumnPtr& doris_co return Status::OK(); } -Status ScalarColumnReader::get_dict_codes(const ColumnString* column_string, - std::vector<int32_t>* dict_codes) { - return _chunk_reader->get_dict_codes(column_string, dict_codes); -} - MutableColumnPtr ScalarColumnReader::convert_dict_column_to_string_column( const ColumnInt32* dict_column) { return _chunk_reader->convert_dict_column_to_string_column(dict_column); diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_reader.h index f0eadb8bcd6..4c6e5b1eac9 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h @@ -128,11 +128,6 @@ public: return Status::NotSupported("read_dict_values_to_column is not supported"); } - virtual Status get_dict_codes(const ColumnString* column_string, - std::vector<int32_t>* dict_codes) { - return Status::NotSupported("get_dict_codes is not supported"); - } - virtual MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32* dict_column) { LOG(FATAL) << "Method convert_dict_column_to_string_column is not supported"; __builtin_unreachable(); @@ -180,8 +175,6 @@ public: ColumnSelectVector& select_vector, size_t batch_size, size_t* read_rows, bool* eof, bool is_dict_filter) override; Status read_dict_values_to_column(MutableColumnPtr& doris_column, bool* has_dict) override; - Status get_dict_codes(const ColumnString* column_string, - std::vector<int32_t>* dict_codes) override; MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32* dict_column) override; const std::vector<level_t>& get_rep_level() const override { return _rep_levels; } const std::vector<level_t>& get_def_level() const override { return _def_levels; } diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index b70beec687b..4e7f53ca75e 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -841,7 +841,7 @@ Status RowGroupReader::_rewrite_dict_predicates() { ++index; } - // 2.2 Execute conjuncts and filter block. + // 2.2 Execute conjuncts. VExprContextSPtrs ctxs; auto iter = _slot_id_to_filter_conjuncts->find(slot_id); if (iter != _slot_id_to_filter_conjuncts->end()) { @@ -854,33 +854,39 @@ Status RowGroupReader::_rewrite_dict_predicates() { return Status::NotFound(msg.str()); } - std::vector<uint32_t> columns_to_filter(1, dict_pos); - int column_to_keep = temp_block.columns(); if (dict_pos != 0) { // VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0 // The following process may be tricky and time-consuming, but we have no other way. temp_block.get_by_position(0).column->assume_mutable()->resize(dict_value_column_size); } + IColumn::Filter result_filter(temp_block.rows(), 1); + bool can_filter_all; { SCOPED_RAW_TIMER(&_predicate_filter_time); - RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts_and_filter_block( - ctxs, nullptr, &temp_block, columns_to_filter, column_to_keep)); + RETURN_IF_ERROR(VExprContext::execute_conjuncts(ctxs, nullptr, &temp_block, + &result_filter, &can_filter_all)); } if (dict_pos != 0) { // We have to clean the first column to insert right data. temp_block.get_by_position(0).column->assume_mutable()->clear(); } - // Check some conditions. - ColumnPtr& dict_column = temp_block.get_by_position(dict_pos).column; - // If dict_column->size() == 0, can filter this row group. - if (dict_column->size() == 0) { + // If can_filter_all = true, can filter this row group. + if (can_filter_all) { _is_row_group_filtered = true; return Status::OK(); } + // 3. Get dict codes. + std::vector<int32_t> dict_codes; + for (size_t i = 0; i < result_filter.size(); ++i) { + if (result_filter[i]) { + dict_codes.emplace_back(i); + } + } + // About Performance: if dict_column size is too large, it will generate a large IN filter. - if (dict_column->size() > MAX_DICT_CODE_PREDICATE_TO_REWRITE) { + if (dict_codes.size() > MAX_DICT_CODE_PREDICATE_TO_REWRITE) { it = _dict_filter_cols.erase(it); for (auto& ctx : ctxs) { _filter_conjuncts.push_back(ctx); @@ -888,22 +894,9 @@ Status RowGroupReader::_rewrite_dict_predicates() { continue; } - // 3. Get dict codes. - std::vector<int32_t> dict_codes; - if (dict_column->is_nullable()) { - const ColumnNullable* nullable_column = - static_cast<const ColumnNullable*>(dict_column.get()); - const ColumnString* nested_column = static_cast<const ColumnString*>( - nullable_column->get_nested_column_ptr().get()); - RETURN_IF_ERROR(_column_readers[dict_filter_col_name]->get_dict_codes( - assert_cast<const ColumnString*>(nested_column), &dict_codes)); - } else { - RETURN_IF_ERROR(_column_readers[dict_filter_col_name]->get_dict_codes( - assert_cast<const ColumnString*>(dict_column.get()), &dict_codes)); - } - // 4. Rewrite conjuncts. - RETURN_IF_ERROR(_rewrite_dict_conjuncts(dict_codes, slot_id, dict_column->is_nullable())); + RETURN_IF_ERROR(_rewrite_dict_conjuncts( + dict_codes, slot_id, temp_block.get_by_position(dict_pos).column->is_nullable())); ++it; } return Status::OK(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org