This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch new_join in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/new_join by this push: new 707b7a4d671 opt for some join (#26513) 707b7a4d671 is described below commit 707b7a4d6712d6b7c5cc993bab3f2a9c522e6930 Author: Pxl <pxl...@qq.com> AuthorDate: Thu Nov 9 08:42:25 2023 +0800 opt for some join (#26513) --- be/src/pipeline/exec/hashjoin_build_sink.cpp | 3 +- be/src/vec/columns/column_vector.cpp | 13 +---- be/src/vec/common/hash_table/hash_map.h | 74 +++++++++++++++++++--------- be/src/vec/exec/join/vhash_join_node.cpp | 2 +- 4 files changed, 56 insertions(+), 36 deletions(-) diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index c7af4f89ba4..748246a8f27 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -483,7 +483,8 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* _build_expr_ctxs, _runtime_filter_descs); RETURN_IF_ERROR(local_state._runtime_filter_slots->init( - state, arg.hash_table->size(), 0)); + state, arg.hash_table->size(), + local_state._build_rf_cardinality)); RETURN_IF_ERROR( local_state._runtime_filter_slots->copy_from_shared_context( _shared_hash_table_context)); diff --git a/be/src/vec/columns/column_vector.cpp b/be/src/vec/columns/column_vector.cpp index 4575d089781..a825e07d5f2 100644 --- a/be/src/vec/columns/column_vector.cpp +++ b/be/src/vec/columns/column_vector.cpp @@ -397,17 +397,8 @@ void ColumnVector<T>::insert_indices_from_join(const IColumn& src, const uint32_ const T* __restrict src_data = reinterpret_cast<const T*>(src.get_raw_data().data); - if constexpr (std::is_same_v<T, UInt8>) { - // nullmap : indices_begin[i] == 0 means is null at the here, set true here - for (uint32_t i = 0; i < new_size; ++i) { - data[origin_size + i] = - (indices_begin[i] == 0) + (indices_begin[i] != 0) * src_data[indices_begin[i]]; - } - } else { - // real data : indices_begin[i] == 0 what at is meaningless - for (uint32_t i = 0; i < new_size; ++i) { - data[origin_size + i] = src_data[indices_begin[i]]; - } + for (uint32_t i = 0; i < new_size; ++i) { + data[origin_size + i] = src_data[indices_begin[i]]; } } diff --git a/be/src/vec/common/hash_table/hash_map.h b/be/src/vec/common/hash_table/hash_map.h index a9506869875..d6c41bbc818 100644 --- a/be/src/vec/common/hash_table/hash_map.h +++ b/be/src/vec/common/hash_table/hash_map.h @@ -24,6 +24,7 @@ #include <span> +#include "common/compiler_util.h" #include "vec/common/hash_table/hash.h" #include "vec/common/hash_table/hash_table.h" #include "vec/common/hash_table/hash_table_allocator.h" @@ -213,7 +214,7 @@ public: using HashMapTable<Key, Cell, Hash, Grower, Allocator>::HashMapTable; static uint32_t calc_bucket_size(size_t num_elem) { - size_t expect_bucket_size = static_cast<size_t>(num_elem) + (num_elem - 1) / 7; + size_t expect_bucket_size = num_elem + (num_elem - 1) / 7; return phmap::priv::NormalizeCapacity(expect_bucket_size) + 1; } @@ -271,10 +272,14 @@ public: } if constexpr (JoinOpType == doris::TJoinOp::RIGHT_ANTI_JOIN || JoinOpType == doris::TJoinOp::RIGHT_SEMI_JOIN) { - return _find_batch_right_semi_anti<with_other_conjuncts>( - keys, bucket_nums, probe_idx, probe_rows, probe_idxs, build_idxs); + if constexpr (!with_other_conjuncts) { + return _find_batch_right_semi_anti(keys, bucket_nums, probe_idx, probe_rows); + } else { + return _find_batch_right_semi_anti_conjunct(keys, bucket_nums, probe_idx, build_idx, + probe_rows, probe_idxs, build_idxs); + } } - return std::tuple {0, 0u, 0}; + return std::tuple {0, 0U, 0}; } template <int JoinOpType> @@ -300,30 +305,57 @@ public: } private: - template <bool with_other_conjuncts> auto _find_batch_right_semi_anti(const Key* __restrict keys, const uint32_t* __restrict bucket_nums, int probe_idx, - int probe_rows, uint32_t* __restrict probe_idxs, - uint32_t* __restrict build_idxs) { + int probe_rows) { auto matched_cnt = 0; while (probe_idx < probe_rows) { auto build_idx = first[bucket_nums[probe_idx]]; while (build_idx) { + if (!visited[build_idx] && keys[probe_idx] == build_keys[build_idx]) { + visited[build_idx] = 1; + } + build_idx = next[build_idx]; + } + probe_idx++; + } + return std::tuple {probe_idx, 0U, matched_cnt}; + } + + auto _find_batch_right_semi_anti_conjunct(const Key* __restrict keys, + const uint32_t* __restrict bucket_nums, int probe_idx, + uint32_t build_idx, int probe_rows, + uint32_t* __restrict probe_idxs, + uint32_t* __restrict build_idxs) { + auto matched_cnt = 0; + const auto batch_size = max_batch_size; + + auto do_the_probe = [&]() { + auto matched_cnt_old = matched_cnt; + while (build_idx && matched_cnt < batch_size) { if (keys[probe_idx] == build_keys[build_idx]) { - if constexpr (with_other_conjuncts) { - build_idxs[matched_cnt] = build_idx; - probe_idxs[matched_cnt] = probe_idx; - matched_cnt++; - } else { - visited[build_idx] = 1; - } + build_idxs[matched_cnt++] = build_idx; } build_idx = next[build_idx]; } + for (auto i = matched_cnt_old; i < matched_cnt; i++) { + probe_idxs[i] = probe_idx; + } probe_idx++; + }; + + if (build_idx) { + do_the_probe(); + } + + while (probe_idx < probe_rows && matched_cnt < batch_size) { + build_idx = first[bucket_nums[probe_idx]]; + do_the_probe(); } - return std::tuple {probe_idx, 0u, matched_cnt}; + + probe_idx -= (matched_cnt == batch_size && build_idx); + return std::tuple {probe_idx, build_idx, matched_cnt}; } template <int JoinOpType> @@ -334,21 +366,17 @@ private: const auto batch_size = max_batch_size; while (probe_idx < probe_rows && matched_cnt < batch_size) { - uint32_t bucket_num = bucket_nums[probe_idx]; - auto build_idx = first[bucket_num]; + auto build_idx = first[bucket_nums[probe_idx]]; - while (build_idx) { - if (keys[probe_idx] == build_keys[build_idx]) { - break; - } + while (build_idx && keys[probe_idx] != build_keys[build_idx]) { build_idx = next[build_idx]; } bool matched = JoinOpType == doris::TJoinOp::LEFT_SEMI_JOIN ? build_idx != 0 : build_idx == 0; + probe_idxs[matched_cnt] = probe_idx++; matched_cnt += matched; - probe_idxs[matched_cnt - matched] = probe_idx++; } - return std::tuple {probe_idx, 0u, matched_cnt}; + return std::tuple {probe_idx, 0U, matched_cnt}; } template <int JoinOpType, bool with_other_conjuncts> diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 4d0ed465400..dc13bb44ee4 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -816,7 +816,7 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc _build_expr_ctxs, _runtime_filter_descs); RETURN_IF_ERROR(_runtime_filter_slots->init( - state, arg.hash_table->size(), 0)); + state, arg.hash_table->size(), _build_rf_cardinality)); RETURN_IF_ERROR(_runtime_filter_slots->copy_from_shared_context( _shared_hash_table_context)); RETURN_IF_ERROR(_runtime_filter_slots->publish()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org