This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 65a32f76b05 [Feature](join) Support lazy materialization of columns that are not used in join other conjunct (#49073) 65a32f76b05 is described below commit 65a32f76b053f1f345f8e53e413152728ac1eb90 Author: Pxl <x...@selectdb.com> AuthorDate: Tue Mar 25 11:57:59 2025 +0800 [Feature](join) Support lazy materialization of columns that are not used in join other conjunct (#49073) ### What problem does this PR solve? 1. Support lazy materialization of columns that are not used in join other conjunct 2. Simplify some code before: ``` HASH_JOIN_OPERATOR (id=6 , nereids_id=1089):(ExecTime: 10sec315ms) - BlocksProduced: 90.294K (90294) - ExecTime: 10sec315ms - NonEqualJoinConjunctEvaluationTime: 2sec454ms - ProbeRows: 1.0M (1000000) - ProbeWhenBuildSideOutputTime: 2sec223ms - ProbeWhenProbeSideOutputTime: 3sec33ms - ProbeWhenSearchHashTableTime: 1sec877ms - ProjectionTime: 155.383ms - RowsProduced: 1.0M (1000000) ``` after: ``` HASH_JOIN_OPERATOR (id=6 , nereids_id=1108):(ExecTime: 5sec669ms) - BlocksProduced: 90.294K (90294) - ExecTime: 5sec669ms - NonEqualJoinConjunctEvaluationTime: 2sec111ms - ProbeRows: 1.0M (1000000) - ProbeWhenBuildSideOutputTime: 689.966ms - ProbeWhenProbeSideOutputTime: 350.553ms - ProbeWhenSearchHashTableTime: 1sec883ms - ProjectionTime: 150.629ms - RowsProduced: 1.0M (1000000) ``` ### Check List (For Author) - Test <!-- At least one of them must be included. --> - [ ] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [x] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [x] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason <!-- Add your reason? --> - Behavior changed: - [x] No. - [ ] Yes. <!-- Explain the behavior change --> - Does this need documentation? - [x] No. - [ ] Yes. <!-- Add document PR link here. eg: https://github.com/apache/doris-website/pull/1214 --> ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label <!-- Add branch pick label that this PR should merge into --> --- be/src/pipeline/exec/hashjoin_probe_operator.cpp | 40 +--- be/src/pipeline/exec/hashjoin_probe_operator.h | 16 +- .../pipeline/exec/join/process_hash_table_probe.h | 41 ++-- .../exec/join/process_hash_table_probe_impl.h | 250 +++++++++++++-------- be/src/vec/common/hash_table/join_hash_table.h | 4 +- be/src/vec/exprs/vexpr.h | 6 + be/src/vec/exprs/vslot_ref.h | 4 + 7 files changed, 196 insertions(+), 165 deletions(-) diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp index 7f273680f6d..a432b59cfa2 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp +++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp @@ -93,34 +93,6 @@ void HashJoinProbeLocalState::prepare_for_next() { _prepare_probe_block(); } -bool HashJoinProbeLocalState::have_other_join_conjunct() const { - return _parent->cast<HashJoinProbeOperatorX>()._have_other_join_conjunct; -} - -bool HashJoinProbeLocalState::is_right_semi_anti() const { - return _parent->cast<HashJoinProbeOperatorX>()._is_right_semi_anti; -} - -bool HashJoinProbeLocalState::is_outer_join() const { - return _parent->cast<HashJoinProbeOperatorX>()._is_outer_join; -} - -std::vector<bool>* HashJoinProbeLocalState::left_output_slot_flags() { - return &_parent->cast<HashJoinProbeOperatorX>()._left_output_slot_flags; -} - -std::vector<bool>* HashJoinProbeLocalState::right_output_slot_flags() { - return &_parent->cast<HashJoinProbeOperatorX>()._right_output_slot_flags; -} - -vectorized::DataTypes HashJoinProbeLocalState::right_table_data_types() { - return _parent->cast<HashJoinProbeOperatorX>()._right_table_data_types; -} - -vectorized::DataTypes HashJoinProbeLocalState::left_table_data_types() { - return _parent->cast<HashJoinProbeOperatorX>()._left_table_data_types; -} - Status HashJoinProbeLocalState::close(RuntimeState* state) { SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_close_timer); @@ -133,12 +105,6 @@ Status HashJoinProbeLocalState::close(RuntimeState* state) { if (process_hashtable_ctx._arena) { process_hashtable_ctx._arena.reset(); } - - if (process_hashtable_ctx._serialize_key_arena) { - process_hashtable_ctx._serialize_key_arena.reset(); - process_hashtable_ctx._serialized_key_buffer_size = - 0; - } }}, *_process_hashtable_ctx_variants); } @@ -259,7 +225,7 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc : nullptr, mutable_join_block, &temp_block, cast_set<uint32_t>(local_state._probe_block.rows()), - _is_mark_join, _have_other_join_conjunct); + _is_mark_join); } else { st = Status::InternalError("uninited hash table"); } @@ -522,6 +488,10 @@ Status HashJoinProbeOperatorX::prepare(RuntimeState* state) { RETURN_IF_ERROR(conjunct->prepare(state, *_intermediate_row_desc)); } + for (auto conjunct : _other_join_conjuncts) { + conjunct->root()->collect_slot_column_ids(_other_conjunct_refer_column_ids); + } + for (auto& conjunct : _mark_join_conjuncts) { RETURN_IF_ERROR(conjunct->prepare(state, *_intermediate_row_desc)); } diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index f3f1983975e..6fdf77e250d 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -61,14 +61,7 @@ public: bool* eos, vectorized::Block* temp_block, bool check_rows_count = true); - bool have_other_join_conjunct() const; - bool is_right_semi_anti() const; - bool is_outer_join() const; - std::vector<bool>* left_output_slot_flags(); - std::vector<bool>* right_output_slot_flags(); - vectorized::DataTypes right_table_data_types(); - vectorized::DataTypes left_table_data_types(); - bool* has_null_in_build_side() { return &_shared_state->_has_null_in_build_side; } + bool has_null_in_build_side() { return _shared_state->_has_null_in_build_side; } const std::shared_ptr<vectorized::Block>& build_block() const { return _shared_state->build_block; } @@ -161,11 +154,17 @@ public: bool need_finalize_variant_column() const { return _need_finalize_variant_column; } + bool is_lazy_materialized_column(int column_id) const { + return _have_other_join_conjunct && !_other_conjunct_refer_column_ids.contains(column_id); + } + private: Status _do_evaluate(vectorized::Block& block, vectorized::VExprContextSPtrs& exprs, RuntimeProfile::Counter& expr_call_timer, std::vector<int>& res_col_ids) const; friend class HashJoinProbeLocalState; + template <int JoinOpType> + friend struct ProcessHashTableProbe; const TJoinDistributionType::type _join_distribution; @@ -184,6 +183,7 @@ private: std::vector<bool> _left_output_slot_flags; std::vector<bool> _right_output_slot_flags; bool _need_finalize_variant_column = false; + std::set<int> _other_conjunct_refer_column_ids; std::vector<std::string> _right_table_column_names; const std::vector<TExpr> _partition_exprs; }; diff --git a/be/src/pipeline/exec/join/process_hash_table_probe.h b/be/src/pipeline/exec/join/process_hash_table_probe.h index 8cfcfc75e1b..0019b180de3 100644 --- a/be/src/pipeline/exec/join/process_hash_table_probe.h +++ b/be/src/pipeline/exec/join/process_hash_table_probe.h @@ -22,7 +22,6 @@ #include "vec/columns/column.h" #include "vec/columns/columns_number.h" #include "vec/common/arena.h" -#include "vec/common/hash_table/join_hash_table.h" namespace doris { namespace vectorized { @@ -33,12 +32,13 @@ struct HashJoinProbeContext; namespace pipeline { class HashJoinProbeLocalState; +class HashJoinProbeOperatorX; using MutableColumnPtr = vectorized::IColumn::MutablePtr; using MutableColumns = std::vector<vectorized::MutableColumnPtr>; using NullMap = vectorized::ColumnUInt8::Container; -using ConstNullMapPtr = const vectorized::NullMap*; +using ConstNullMapPtr = const NullMap*; template <int JoinOpType> struct ProcessHashTableProbe { @@ -46,24 +46,19 @@ struct ProcessHashTableProbe { ~ProcessHashTableProbe() = default; // output build side result column - void build_side_output_column(vectorized::MutableColumns& mcol, - const std::vector<bool>& output_slot_flags, int size, - bool have_other_join_conjunct, bool is_mark_join); + void build_side_output_column(vectorized::MutableColumns& mcol, int size, bool is_mark_join); - void probe_side_output_column(vectorized::MutableColumns& mcol, - const std::vector<bool>& output_slot_flags, int size, - bool all_match_one, bool have_other_join_conjunct); + void probe_side_output_column(vectorized::MutableColumns& mcol, int size, bool all_match_one); template <typename HashTableType> Status process(HashTableType& hash_table_ctx, ConstNullMapPtr null_map, vectorized::MutableBlock& mutable_block, vectorized::Block* output_block, - uint32_t probe_rows, bool is_mark_join, bool have_other_join_conjunct); + uint32_t probe_rows, bool is_mark_join); // Only process the join with no other join conjunct, because of no other join conjunt // the output block struct is same with mutable block. we can do more opt on it and simplify // the logic of probe - // TODO: opt the visited here to reduce the size of hash table - template <typename HashTableType, bool with_other_conjuncts, bool is_mark_join> + template <typename HashTableType, bool is_mark_join> Status do_process(HashTableType& hash_table_ctx, const uint8_t* null_map, vectorized::MutableBlock& mutable_block, vectorized::Block* output_block, uint32_t probe_rows); @@ -74,12 +69,13 @@ struct ProcessHashTableProbe { Status do_other_join_conjuncts(vectorized::Block* output_block, DorisVector<uint8_t>& visited, bool has_null_in_build_side); - template <bool with_other_conjuncts> Status do_mark_join_conjuncts(vectorized::Block* output_block, const uint8_t* null_map); + Status finalize_block_with_filter(vectorized::Block* output_block, size_t filter_column_id, + size_t column_to_keep); + template <typename HashTableType> typename HashTableType::State _init_probe_side(HashTableType& hash_table_ctx, size_t probe_rows, - bool with_other_join_conjuncts, const uint8_t* null_map); // Process full outer join/ right join / right semi/anti join to output the join result @@ -93,15 +89,17 @@ struct ProcessHashTableProbe { uint32_t _process_probe_null_key(uint32_t probe_idx); pipeline::HashJoinProbeLocalState* _parent = nullptr; + pipeline::HashJoinProbeOperatorX* _parent_operator = nullptr; + const int _batch_size; const std::shared_ptr<vectorized::Block>& _build_block; std::unique_ptr<vectorized::Arena> _arena; - std::vector<StringRef> _probe_keys; - std::vector<uint32_t> _probe_indexs; + vectorized::ColumnVector<uint32_t> _probe_indexs; + vectorized::ColumnVector<uint32_t> _output_row_indexs; bool _probe_visited = false; bool _picking_null_keys = false; - std::vector<uint32_t> _build_indexs; + vectorized::ColumnVector<uint32_t> _build_indexs; std::vector<uint8_t> _null_flags; /// If the probe key of one row on left side is null, @@ -111,19 +109,14 @@ struct ProcessHashTableProbe { std::vector<int> _build_blocks_locs; - size_t _serialized_key_buffer_size {0}; - uint8_t* _serialized_key_buffer = nullptr; - std::unique_ptr<vectorized::Arena> _serialize_key_arena; std::vector<char> _probe_side_find_result; - bool _have_other_join_conjunct; - bool _is_right_semi_anti; - std::vector<bool>* _left_output_slot_flags = nullptr; - std::vector<bool>* _right_output_slot_flags = nullptr; + const bool _have_other_join_conjunct; + const std::vector<bool>& _left_output_slot_flags; + const std::vector<bool>& _right_output_slot_flags; // nullable column but not has null except first row std::vector<bool> _build_column_has_null; bool _need_calculate_build_index_has_zero = true; - bool* _has_null_in_build_side; RuntimeProfile::Counter* _search_hashtable_timer = nullptr; RuntimeProfile::Counter* _init_probe_side_timer = nullptr; diff --git a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h index b06e7c7c388..4c46d581bc4 100644 --- a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h +++ b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h @@ -25,10 +25,10 @@ #include "process_hash_table_probe.h" #include "runtime/thread_context.h" // IWYU pragma: keep #include "util/simd/bits.h" +#include "vec/columns/column_const.h" #include "vec/columns/column_filter_helper.h" #include "vec/columns/column_nullable.h" #include "vec/columns/columns_number.h" -#include "vec/common/hash_table/join_hash_table.h" #include "vec/exprs/vexpr_context.h" namespace doris::pipeline { @@ -37,44 +37,42 @@ template <int JoinOpType> ProcessHashTableProbe<JoinOpType>::ProcessHashTableProbe(HashJoinProbeLocalState* parent, int batch_size) : _parent(parent), + _parent_operator(&parent->_parent->template cast<HashJoinProbeOperatorX>()), _batch_size(batch_size), _build_block(parent->build_block()), - _have_other_join_conjunct(parent->have_other_join_conjunct()), - _is_right_semi_anti(parent->is_right_semi_anti()), - _left_output_slot_flags(parent->left_output_slot_flags()), - _right_output_slot_flags(parent->right_output_slot_flags()), - _has_null_in_build_side(parent->has_null_in_build_side()), + _have_other_join_conjunct(_parent_operator->_have_other_join_conjunct), + _left_output_slot_flags(_parent_operator->_left_output_slot_flags), + _right_output_slot_flags(_parent_operator->_right_output_slot_flags), _search_hashtable_timer(parent->_search_hashtable_timer), _init_probe_side_timer(parent->_init_probe_side_timer), _build_side_output_timer(parent->_build_side_output_timer), _probe_side_output_timer(parent->_probe_side_output_timer), _finish_probe_phase_timer(parent->_finish_probe_phase_timer), - _right_col_idx((_is_right_semi_anti && !_have_other_join_conjunct) + _right_col_idx((_parent_operator->_is_right_semi_anti && !_have_other_join_conjunct) ? 0 - : _parent->left_table_data_types().size()), - _right_col_len(_parent->right_table_data_types().size()) {} + : _parent_operator->_left_table_data_types.size()), + _right_col_len(_parent_operator->_right_table_data_types.size()) {} template <int JoinOpType> -void ProcessHashTableProbe<JoinOpType>::build_side_output_column( - vectorized::MutableColumns& mcol, const std::vector<bool>& output_slot_flags, int size, - bool have_other_join_conjunct, bool is_mark_join) { +void ProcessHashTableProbe<JoinOpType>::build_side_output_column(vectorized::MutableColumns& mcol, + int size, bool is_mark_join) { SCOPED_TIMER(_build_side_output_timer); // indicates whether build_indexs contain 0 bool build_index_has_zero = (JoinOpType != TJoinOp::INNER_JOIN && JoinOpType != TJoinOp::RIGHT_OUTER_JOIN) || - have_other_join_conjunct || is_mark_join; + _have_other_join_conjunct || is_mark_join; if (!size) { return; } if (!build_index_has_zero && _build_column_has_null.empty()) { _need_calculate_build_index_has_zero = false; - _build_column_has_null.resize(output_slot_flags.size()); + _build_column_has_null.resize(_right_output_slot_flags.size()); for (int i = 0; i < _right_col_len; i++) { const auto& column = *_build_block->safe_get_by_position(i).column; _build_column_has_null[i] = false; - if (output_slot_flags[i] && column.is_nullable()) { + if (_right_output_slot_flags[i] && column.is_nullable()) { const auto& nullable = assert_cast<const vectorized::ColumnNullable&>(column); _build_column_has_null[i] = !simd::contain_byte( nullable.get_null_map_data().data() + 1, nullable.size() - 1, 1); @@ -83,16 +81,18 @@ void ProcessHashTableProbe<JoinOpType>::build_side_output_column( } } - for (size_t i = 0; i < _right_col_len && i + _right_col_idx < mcol.size(); i++) { + for (int i = 0; i < _right_col_len && i + _right_col_idx < mcol.size(); i++) { const auto& column = *_build_block->safe_get_by_position(i).column; - if (output_slot_flags[i]) { + if (_right_output_slot_flags[i] && + !_parent_operator->is_lazy_materialized_column(i + (int)_right_col_idx)) { if (!build_index_has_zero && _build_column_has_null[i]) { assert_cast<vectorized::ColumnNullable*>(mcol[i + _right_col_idx].get()) - ->insert_indices_from_not_has_null(column, _build_indexs.data(), - _build_indexs.data() + size); + ->insert_indices_from_not_has_null(column, _build_indexs.get_data().data(), + _build_indexs.get_data().data() + size); } else { - mcol[i + _right_col_idx]->insert_indices_from(column, _build_indexs.data(), - _build_indexs.data() + size); + mcol[i + _right_col_idx]->insert_indices_from( + column, _build_indexs.get_data().data(), + _build_indexs.get_data().data() + size); } } else if (i + _right_col_idx != _parent->_mark_column_id) { mcol[i + _right_col_idx]->insert_default(); @@ -115,23 +115,25 @@ void ProcessHashTableProbe<JoinOpType>::build_side_output_column( } template <int JoinOpType> -void ProcessHashTableProbe<JoinOpType>::probe_side_output_column( - vectorized::MutableColumns& mcol, const std::vector<bool>& output_slot_flags, int size, - bool all_match_one, bool have_other_join_conjunct) { +void ProcessHashTableProbe<JoinOpType>::probe_side_output_column(vectorized::MutableColumns& mcol, + int size, bool all_match_one) { SCOPED_TIMER(_probe_side_output_timer); auto& probe_block = _parent->_probe_block; - for (int i = 0; i < output_slot_flags.size(); ++i) { - if (output_slot_flags[i]) { - if (auto& p = _parent->parent()->cast<HashJoinProbeOperatorX>(); - p.need_finalize_variant_column()) { + + for (int i = 0; i < _left_output_slot_flags.size(); ++i) { + if (_left_output_slot_flags[i]) { + if (_parent_operator->need_finalize_variant_column()) { std::move(*probe_block.get_by_position(i).column).mutate()->finalize(); } + } + + if (_left_output_slot_flags[i] && !_parent_operator->is_lazy_materialized_column(i)) { auto& column = probe_block.get_by_position(i).column; if (all_match_one) { - mcol[i]->insert_range_from(*column, _probe_indexs[0], size); + mcol[i]->insert_range_from(*column, _probe_indexs.get_element(0), size); } else { - mcol[i]->insert_indices_from(*column, _probe_indexs.data(), - _probe_indexs.data() + size); + mcol[i]->insert_indices_from(*column, _probe_indexs.get_data().data(), + _probe_indexs.get_data().data() + size); } } else { mcol[i]->insert_default(); @@ -143,14 +145,13 @@ void ProcessHashTableProbe<JoinOpType>::probe_side_output_column( template <int JoinOpType> template <typename HashTableType> typename HashTableType::State ProcessHashTableProbe<JoinOpType>::_init_probe_side( - HashTableType& hash_table_ctx, size_t probe_rows, bool with_other_join_conjuncts, - const uint8_t* null_map) { + HashTableType& hash_table_ctx, size_t probe_rows, const uint8_t* null_map) { // may over batch size 1 for some outer join case _probe_indexs.resize(_batch_size + 1); _build_indexs.resize(_batch_size + 1); if ((JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) && - with_other_join_conjuncts) { + _have_other_join_conjunct) { _null_flags.resize(_batch_size + 1); } @@ -174,7 +175,7 @@ typename HashTableType::State ProcessHashTableProbe<JoinOpType>::_init_probe_sid } template <int JoinOpType> -template <typename HashTableType, bool with_other_conjuncts, bool is_mark_join> +template <typename HashTableType, bool is_mark_join> Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_ctx, const uint8_t* null_map, vectorized::MutableBlock& mutable_block, @@ -188,7 +189,7 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c auto& build_index = _parent->_build_index; { SCOPED_TIMER(_init_probe_side_timer); - _init_probe_side<HashTableType>(hash_table_ctx, probe_rows, with_other_conjuncts, null_map); + _init_probe_side<HashTableType>(hash_table_ctx, probe_rows, null_map); } auto& mcol = mutable_block.mutable_columns(); @@ -196,7 +197,7 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c uint32_t current_offset = 0; if ((JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) && - with_other_conjuncts) { + _have_other_join_conjunct) { SCOPED_TIMER(_search_hashtable_timer); /// If `_build_index_for_null_probe_key` is not zero, it means we are in progress of handling probe null key. @@ -214,8 +215,9 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c auto [new_probe_idx, new_build_idx, new_current_offset, picking_null_keys] = hash_table_ctx.hash_table->find_null_aware_with_other_conjuncts( hash_table_ctx.keys, hash_table_ctx.bucket_nums.data(), probe_index, - build_index, probe_rows, _probe_indexs.data(), _build_indexs.data(), - _null_flags.data(), _picking_null_keys, null_map); + build_index, probe_rows, _probe_indexs.get_data().data(), + _build_indexs.get_data().data(), _null_flags.data(), _picking_null_keys, + null_map); probe_index = new_probe_idx; build_index = new_build_idx; current_offset = new_current_offset; @@ -237,20 +239,20 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c auto [new_probe_idx, new_build_idx, new_current_offset] = hash_table_ctx.hash_table->template find_batch<JoinOpType>( hash_table_ctx.keys, hash_table_ctx.bucket_nums.data(), probe_index, - build_index, cast_set<int32_t>(probe_rows), _probe_indexs.data(), - _probe_visited, _build_indexs.data(), null_map, with_other_conjuncts, - is_mark_join, !_parent->_mark_join_conjuncts.empty()); + build_index, cast_set<int32_t>(probe_rows), _probe_indexs.get_data().data(), + _probe_visited, _build_indexs.get_data().data(), null_map, + _have_other_join_conjunct, is_mark_join, + !_parent->_mark_join_conjuncts.empty()); probe_index = new_probe_idx; build_index = new_build_idx; current_offset = new_current_offset; } - build_side_output_column(mcol, *_right_output_slot_flags, current_offset, with_other_conjuncts, - is_mark_join); + build_side_output_column(mcol, current_offset, is_mark_join); - if constexpr (with_other_conjuncts || (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN && - JoinOpType != TJoinOp::RIGHT_ANTI_JOIN)) { - auto check_all_match_one = [](const std::vector<uint32_t>& vecs, int size) { + if (_have_other_join_conjunct || + (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN && JoinOpType != TJoinOp::RIGHT_ANTI_JOIN)) { + auto check_all_match_one = [](const auto& vecs, int size) { if (!size || vecs[size - 1] != vecs[0] + size - 1) { return false; } @@ -262,9 +264,8 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c return true; }; - probe_side_output_column(mcol, *_left_output_slot_flags, current_offset, - check_all_match_one(_probe_indexs, current_offset), - with_other_conjuncts); + probe_side_output_column(mcol, current_offset, + check_all_match_one(_probe_indexs.get_data(), current_offset)); } output_block->swap(mutable_block.to_block()); @@ -275,9 +276,8 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) && hash_table_ctx.hash_table ->empty_build_side(); // empty build side will return false to instead null - return do_mark_join_conjuncts<with_other_conjuncts>(output_block, - ignore_null_map ? nullptr : null_map); - } else if constexpr (with_other_conjuncts) { + return do_mark_join_conjuncts(output_block, ignore_null_map ? nullptr : null_map); + } else if (_have_other_join_conjunct) { return do_other_join_conjuncts(output_block, hash_table_ctx.hash_table->get_visited(), hash_table_ctx.hash_table->has_null_key()); } @@ -293,15 +293,15 @@ uint32_t ProcessHashTableProbe<JoinOpType>::_process_probe_null_key(uint32_t pro DCHECK_LT(0, _build_index_for_null_probe_key); uint32_t matched_cnt = 0; for (; _build_index_for_null_probe_key < rows && matched_cnt < _batch_size; ++matched_cnt) { - _probe_indexs[matched_cnt] = probe_index; - _build_indexs[matched_cnt] = _build_index_for_null_probe_key++; + _probe_indexs.get_element(matched_cnt) = probe_index; + _build_indexs.get_element(matched_cnt) = _build_index_for_null_probe_key++; _null_flags[matched_cnt] = 1; } if (_build_index_for_null_probe_key == rows) { _build_index_for_null_probe_key = 0; - _probe_indexs[matched_cnt] = probe_index; - _build_indexs[matched_cnt] = 0; + _probe_indexs.get_element(matched_cnt) = probe_index; + _build_indexs.get_element(matched_cnt) = 0; _null_flags[matched_cnt] = 0; matched_cnt++; } @@ -309,6 +309,65 @@ uint32_t ProcessHashTableProbe<JoinOpType>::_process_probe_null_key(uint32_t pro return matched_cnt; } +template <int JoinOpType> +Status ProcessHashTableProbe<JoinOpType>::finalize_block_with_filter( + vectorized::Block* output_block, size_t filter_column_id, size_t column_to_keep) { + vectorized::ColumnPtr filter_ptr = output_block->get_by_position(filter_column_id).column; + RETURN_IF_ERROR( + vectorized::Block::filter_block(output_block, filter_column_id, column_to_keep)); + + auto do_lazy_materialize = [&](const std::vector<bool>& output_slot_flags, + vectorized::ColumnVector<unsigned int>& row_indexs, + int column_offset, vectorized::Block* source_block) { + if (!_have_other_join_conjunct) { + return; + } + std::vector<int> column_ids; + for (int i = 0; i < output_slot_flags.size(); ++i) { + if (output_slot_flags[i] && + _parent_operator->is_lazy_materialized_column(i + column_offset)) { + column_ids.push_back(i); + } + } + if (column_ids.empty()) { + return; + } + size_t row_count = filter_ptr->size(); + // input row_indexs's size may bigger than row_count coz _init_probe_side + row_indexs.resize(row_count); + + bool need_filter = + simd::count_zero_num( + (int8_t*)assert_cast<const vectorized::ColumnUInt8*>(filter_ptr.get()) + ->get_data() + .data(), + row_count) != 0; + if (need_filter) { + const auto& column_filter = + assert_cast<const vectorized::ColumnUInt8*>(filter_ptr.get())->get_data(); + row_indexs.filter(column_filter); + } + + const auto& container = row_indexs.get_data(); + for (int column_id : column_ids) { + int output_column_id = column_id + column_offset; + output_block->get_by_position(output_column_id).column = + assert_cast<const vectorized::ColumnConst*>( + output_block->get_by_position(output_column_id).column.get()) + ->get_data_column_ptr(); + + auto& src = source_block->get_by_position(column_id).column; + auto dst = output_block->get_by_position(output_column_id).column->assume_mutable(); + dst->clear(); + dst->insert_indices_from(*src, container.data(), container.data() + container.size()); + } + }; + do_lazy_materialize(_right_output_slot_flags, _build_indexs, (int)_right_col_idx, + _build_block.get()); + do_lazy_materialize(_left_output_slot_flags, _probe_indexs, 0, &_parent->_probe_block); + return Status::OK(); +} + /** * Mark join: there is a column named mark column which stores the result of mark join conjunct. * For example: @@ -343,7 +402,6 @@ uint32_t ProcessHashTableProbe<JoinOpType>::_process_probe_null_key(uint32_t pro * So this query will be a "null aware left anti join", which means the equal conjunct's result should be nullable. */ template <int JoinOpType> -template <bool with_other_conjuncts> Status ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(vectorized::Block* output_block, const uint8_t* null_map) { DCHECK(JoinOpType == TJoinOp::LEFT_ANTI_JOIN || @@ -377,23 +435,23 @@ Status ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(vectorized::Blo // select 4 not in (2, 3, null) => null, select 4 not in (2, 3) => true // select 4 in (2, 3, null) => null, select 4 in (2, 3) => false for (size_t i = 0; i != row_count; ++i) { - mark_filter_data[i] = _build_indexs[i] != 0; + mark_filter_data[i] = _build_indexs.get_element(i) != 0; } - if constexpr (with_other_conjuncts) { + if (_have_other_join_conjunct) { // _null_flags is true means build or probe side of the row is null memcpy(mark_null_map, _null_flags.data(), row_count); } else { if (null_map) { // probe side of the row is null, so the mark sign should also be null. for (size_t i = 0; i != row_count; ++i) { - mark_null_map[i] |= null_map[_probe_indexs[i]]; + mark_null_map[i] |= null_map[_probe_indexs.get_element(i)]; } } - if (!with_other_conjuncts && *_has_null_in_build_side) { + if (!_have_other_join_conjunct && _parent->has_null_in_build_side()) { // _has_null_in_build_side will change false to null when row not matched for (size_t i = 0; i != row_count; ++i) { - mark_null_map[i] |= _build_indexs[i] == 0; + mark_null_map[i] |= _build_indexs.get_element(i) == 0; } } } @@ -401,11 +459,11 @@ Status ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(vectorized::Blo // for non null aware join, build_indexs is 0 which means there is no match // sometimes null will be returned in conjunct, but it should not actually be null. for (size_t i = 0; i != row_count; ++i) { - mark_null_map[i] &= _build_indexs[i] != 0; + mark_null_map[i] &= _build_indexs.get_element(i) != 0; } } - if constexpr (with_other_conjuncts) { + if (_have_other_join_conjunct) { vectorized::IColumn::Filter other_conjunct_filter(row_count, 1); { bool can_be_filter_all = false; @@ -428,19 +486,20 @@ Status ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(vectorized::Blo auto filter_column = vectorized::ColumnUInt8::create(row_count, 0); auto* __restrict filter_map = filter_column->get_data().data(); for (size_t i = 0; i != row_count; ++i) { - if (_parent->_last_probe_match == _probe_indexs[i]) { + if (_parent->_last_probe_match == _probe_indexs.get_element(i)) { continue; } - if (_build_indexs[i] == 0) { - bool has_null_mark_value = _parent->_last_probe_null_mark == _probe_indexs[i]; + if (_build_indexs.get_element(i) == 0) { + bool has_null_mark_value = + _parent->_last_probe_null_mark == _probe_indexs.get_element(i); filter_map[i] = true; mark_filter_data[i] = false; mark_null_map[i] |= has_null_mark_value; } else if (mark_null_map[i]) { - _parent->_last_probe_null_mark = _probe_indexs[i]; + _parent->_last_probe_null_mark = _probe_indexs.get_element(i); } else if (mark_filter_data[i]) { filter_map[i] = true; - _parent->_last_probe_match = _probe_indexs[i]; + _parent->_last_probe_match = _probe_indexs.get_element(i); } } @@ -454,7 +513,7 @@ Status ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(vectorized::Blo auto result_column_id = output_block->columns(); output_block->insert( {std::move(filter_column), std::make_shared<vectorized::DataTypeUInt8>(), ""}); - return vectorized::Block::filter_block(output_block, result_column_id, result_column_id); + return finalize_block_with_filter(output_block, result_column_id, result_column_id); } template <int JoinOpType> @@ -495,24 +554,24 @@ Status ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(vectorized::Bl // process equal-conjuncts-matched tuples that are newly generated // in this run if there are any. - for (int i = 0; i < row_count; ++i) { - bool join_hit = _build_indexs[i]; + for (size_t i = 0; i < row_count; ++i) { + bool join_hit = _build_indexs.get_element(i); bool other_hit = filter_column_ptr[i]; if (!join_hit) { - filter_map[i] = _parent->_last_probe_match != _probe_indexs[i]; + filter_map[i] = _parent->_last_probe_match != _probe_indexs.get_element(i); } else { filter_map[i] = other_hit; } if (filter_map[i]) { - _parent->_last_probe_match = _probe_indexs[i]; + _parent->_last_probe_match = _probe_indexs.get_element(i); } } for (size_t i = 0; i < row_count; ++i) { if (filter_map[i]) { if constexpr (JoinOpType == TJoinOp::FULL_OUTER_JOIN) { - visited[_build_indexs[i]] = 1; + visited[_build_indexs.get_element(i)] = 1; } } } @@ -524,24 +583,24 @@ Status ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(vectorized::Bl auto* __restrict filter_map = new_filter_column->get_data().data(); for (size_t i = 0; i < row_count; ++i) { - bool not_matched_before = _parent->_last_probe_match != _probe_indexs[i]; + bool not_matched_before = _parent->_last_probe_match != _probe_indexs.get_element(i); if constexpr (JoinOpType == TJoinOp::LEFT_SEMI_JOIN) { - if (_build_indexs[i] == 0) { + if (_build_indexs.get_element(i) == 0) { filter_map[i] = false; } else if (filter_column_ptr[i]) { filter_map[i] = not_matched_before; - _parent->_last_probe_match = _probe_indexs[i]; + _parent->_last_probe_match = _probe_indexs.get_element(i); } else { filter_map[i] = false; } } else { - if (_build_indexs[i] == 0) { + if (_build_indexs.get_element(i) == 0) { filter_map[i] = not_matched_before; } else { filter_map[i] = false; if (filter_column_ptr[i]) { - _parent->_last_probe_match = _probe_indexs[i]; + _parent->_last_probe_match = _probe_indexs.get_element(i); } } } @@ -551,11 +610,11 @@ Status ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(vectorized::Bl } else if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN || JoinOpType == TJoinOp::RIGHT_ANTI_JOIN) { for (int i = 0; i < row_count; ++i) { - visited[_build_indexs[i]] |= filter_column_ptr[i]; + visited[_build_indexs.get_element(i)] |= filter_column_ptr[i]; } } else if constexpr (JoinOpType == TJoinOp::RIGHT_OUTER_JOIN) { for (int i = 0; i < row_count; ++i) { - visited[_build_indexs[i]] |= filter_column_ptr[i]; + visited[_build_indexs.get_element(i)] |= filter_column_ptr[i]; } } @@ -568,8 +627,8 @@ Status ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(vectorized::Bl JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { orig_columns = _right_col_idx; } - RETURN_IF_ERROR( - vectorized::Block::filter_block(output_block, result_column_id, orig_columns)); + + return finalize_block_with_filter(output_block, result_column_id, orig_columns); } return Status::OK(); @@ -602,17 +661,18 @@ Status ProcessHashTableProbe<JoinOpType>::finish_probing(HashTableType& hash_tab mcol.size(), _right_col_len, _right_col_idx); } for (size_t j = 0; j < _right_col_len; ++j) { - if (_right_output_slot_flags->at(j)) { + if (_right_output_slot_flags[j]) { const auto& column = *_build_block->safe_get_by_position(j).column; - mcol[j + _right_col_idx]->insert_indices_from(column, _build_indexs.data(), - _build_indexs.data() + block_size); + mcol[j + _right_col_idx]->insert_indices_from( + column, _build_indexs.get_data().data(), + _build_indexs.get_data().data() + block_size); } else { mcol[j + _right_col_idx]->resize(block_size); } } // just resize the left table column in case with other conjunct to make block size is not zero - if (_is_right_semi_anti && _have_other_join_conjunct) { + if (_parent_operator->_is_right_semi_anti && _have_other_join_conjunct) { for (int i = 0; i < _right_col_idx; ++i) { mcol[i]->resize(block_size); } @@ -638,17 +698,15 @@ Status ProcessHashTableProbe<JoinOpType>::process(HashTableType& hash_table_ctx, vectorized::ConstNullMapPtr null_map, vectorized::MutableBlock& mutable_block, vectorized::Block* output_block, - uint32_t probe_rows, bool is_mark_join, - bool have_other_join_conjunct) { + uint32_t probe_rows, bool is_mark_join) { Status res; std::visit( - [&](auto is_mark_join, auto have_other_join_conjunct) { - res = do_process<HashTableType, have_other_join_conjunct, is_mark_join>( + [&](auto is_mark_join) { + res = do_process<HashTableType, is_mark_join>( hash_table_ctx, null_map ? null_map->data() : nullptr, mutable_block, output_block, probe_rows); }, - vectorized::make_bool_variant(is_mark_join), - vectorized::make_bool_variant(have_other_join_conjunct)); + vectorized::make_bool_variant(is_mark_join)); return res; } @@ -664,7 +722,7 @@ struct ExtractType<T(U)> { template Status ProcessHashTableProbe<JoinOpType>::process<ExtractType<void(T)>::Type>( \ ExtractType<void(T)>::Type & hash_table_ctx, vectorized::ConstNullMapPtr null_map, \ vectorized::MutableBlock & mutable_block, vectorized::Block * output_block, \ - uint32_t probe_rows, bool is_mark_join, bool have_other_join_conjunct); \ + uint32_t probe_rows, bool is_mark_join); \ template Status ProcessHashTableProbe<JoinOpType>::finish_probing<ExtractType<void(T)>::Type>( \ ExtractType<void(T)>::Type & hash_table_ctx, vectorized::MutableBlock & mutable_block, \ vectorized::Block * output_block, bool* eos, bool is_mark_join); diff --git a/be/src/vec/common/hash_table/join_hash_table.h b/be/src/vec/common/hash_table/join_hash_table.h index faccb4136d3..96d2ab2fcb9 100644 --- a/be/src/vec/common/hash_table/join_hash_table.h +++ b/be/src/vec/common/hash_table/join_hash_table.h @@ -179,7 +179,7 @@ public: } template <int JoinOpType, bool is_mark_join> - bool iterate_map(std::vector<uint32_t>& build_idxs, + bool iterate_map(vectorized::ColumnVector<uint32_t>& build_idxs, vectorized::ColumnFilterHelper* mark_column_helper) const { const auto batch_size = max_batch_size; const auto elem_num = visited.size(); @@ -188,7 +188,7 @@ public: while (count < batch_size && iter_idx < elem_num) { const auto matched = visited[iter_idx]; - build_idxs[count] = iter_idx; + build_idxs.get_element(count) = iter_idx; if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) { if constexpr (is_mark_join) { mark_column_helper->insert_value(matched); diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h index ea4575b7e61..3b42b4bf250 100644 --- a/be/src/vec/exprs/vexpr.h +++ b/be/src/vec/exprs/vexpr.h @@ -262,6 +262,12 @@ public: void set_index_unique_id(uint32_t index_unique_id) { _index_unique_id = index_unique_id; } uint32_t index_unique_id() const { return _index_unique_id; } + virtual void collect_slot_column_ids(std::set<int>& column_ids) const { + for (auto child : _children) { + child->collect_slot_column_ids(column_ids); + } + } + protected: /// Simple debug string that provides no expr subclass-specific information std::string debug_string(const std::string& expr_name) const { diff --git a/be/src/vec/exprs/vslot_ref.h b/be/src/vec/exprs/vslot_ref.h index 48f08d1d5e0..145428a1cf3 100644 --- a/be/src/vec/exprs/vslot_ref.h +++ b/be/src/vec/exprs/vslot_ref.h @@ -59,6 +59,10 @@ public: size_t estimate_memory(const size_t rows) override { return 0; } + void collect_slot_column_ids(std::set<int>& column_ids) const override { + column_ids.insert(_column_id); + } + private: int _slot_id; int _column_id; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org