This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch new_join in repository https://gitbox.apache.org/repos/asf/doris.git
commit 76592a3ff8ba4e981d4dd2057a5a8408f629b5ab Author: HappenLee <happen...@hotmail.com> AuthorDate: Tue Oct 31 15:33:37 2023 +0800 fix bug in shared hash table (#26158) --- be/src/pipeline/exec/hashjoin_probe_operator.h | 1 + be/src/vec/common/hash_table/hash_map.h | 34 ++++++++-------------- .../vec/exec/join/process_hash_table_probe_impl.h | 6 ++-- be/src/vec/exec/join/vhash_join_node.h | 1 + 4 files changed, 18 insertions(+), 24 deletions(-) diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index 181934e7b50..31a3bd38b93 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -99,6 +99,7 @@ private: friend struct vectorized::ProcessHashTableProbe; int _probe_index = -1; + uint32_t _build_index = 0; bool _ready_probe = false; bool _probe_eos = false; std::atomic<bool> _probe_inited = false; diff --git a/be/src/vec/common/hash_table/hash_map.h b/be/src/vec/common/hash_table/hash_map.h index 07528b857fa..cafe01e8231 100644 --- a/be/src/vec/common/hash_table/hash_map.h +++ b/be/src/vec/common/hash_table/hash_map.h @@ -248,13 +248,13 @@ public: template <int JoinOpType> auto find_batch(const Key* __restrict keys, const uint32_t* __restrict bucket_nums, - int probe_idx, int probe_rows, uint32_t* __restrict probe_idxs, - uint32_t* __restrict build_idxs) { + int probe_idx, uint32_t build_idx, int probe_rows, + uint32_t* __restrict probe_idxs, uint32_t* __restrict build_idxs) { if constexpr (JoinOpType == doris::TJoinOp::INNER_JOIN || JoinOpType == doris::TJoinOp::FULL_OUTER_JOIN || JoinOpType == doris::TJoinOp::LEFT_OUTER_JOIN || JoinOpType == doris::TJoinOp::RIGHT_OUTER_JOIN) { - return _find_batch_inner_outer_join<JoinOpType>(keys, bucket_nums, probe_idx, + return _find_batch_inner_outer_join<JoinOpType>(keys, bucket_nums, probe_idx, build_idx, probe_rows, probe_idxs, build_idxs); } if constexpr (JoinOpType == doris::TJoinOp::LEFT_ANTI_JOIN || @@ -266,7 +266,7 @@ public: JoinOpType == doris::TJoinOp::RIGHT_SEMI_JOIN) { return _find_batch_right_semi_anti(keys, bucket_nums, probe_idx, probe_rows); } - return std::pair {0, 0}; + return std::tuple {0, 0u, 0}; } template <int JoinOpType> @@ -295,7 +295,7 @@ private: auto _find_batch_right_semi_anti(const Key* __restrict keys, const uint32_t* __restrict bucket_nums, int probe_idx, int probe_rows) { - while (LIKELY(probe_idx < probe_rows)) { + while (probe_idx < probe_rows) { auto build_idx = first[bucket_nums[probe_idx]]; while (build_idx) { @@ -306,7 +306,7 @@ private: } probe_idx++; } - return std::pair {probe_idx, 0}; + return std::tuple {probe_idx, 0u, 0}; } template <int JoinOpType> @@ -331,17 +331,17 @@ private: matched_cnt += matched; probe_idxs[matched_cnt - matched] = probe_idx++; } - return std::pair {probe_idx, matched_cnt}; + return std::tuple {probe_idx, 0u, matched_cnt}; } template <int JoinOpType> auto _find_batch_inner_outer_join(const Key* __restrict keys, const uint32_t* __restrict bucket_nums, int probe_idx, - int probe_rows, uint32_t* __restrict probe_idxs, + uint32_t build_idx, int probe_rows, + uint32_t* __restrict probe_idxs, uint32_t* __restrict build_idxs) { auto matched_cnt = 0; const auto batch_size = max_batch_size; - size_t build_idx = 0; auto do_the_probe = [&]() { while (build_idx && matched_cnt < batch_size) { @@ -369,10 +369,7 @@ private: probe_idx++; }; - if (probe_idx == current_probe_idx) { - current_probe_idx = -1; - build_idx = current_build_idx; - current_build_idx = 0; + if (build_idx) { do_the_probe(); } @@ -382,12 +379,8 @@ private: do_the_probe(); } - if (matched_cnt == batch_size && build_idx) { - probe_idx--; - current_probe_idx = probe_idx; - current_build_idx = build_idx; - } - return std::pair {probe_idx, matched_cnt}; + probe_idx -= (matched_cnt == batch_size && build_idx); + return std::tuple {probe_idx, build_idx, matched_cnt}; } const Key* __restrict build_keys; @@ -396,9 +389,6 @@ private: uint32_t bucket_size = 0; int max_batch_size = 0; - int current_probe_idx = -1; - uint32_t current_build_idx = 0; - std::vector<uint32_t> first; std::vector<uint32_t> next; diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h b/be/src/vec/exec/join/process_hash_table_probe_impl.h index 248e8f42328..6a21086f50e 100644 --- a/be/src/vec/exec/join/process_hash_table_probe_impl.h +++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h @@ -193,6 +193,7 @@ Status ProcessHashTableProbe<JoinOpType, Parent>::do_process(HashTableType& hash Block* output_block, size_t probe_rows) { auto& probe_index = _parent->_probe_index; + auto& build_index = _parent->_build_index; using Mapped = typename HashTableType::Mapped; @@ -233,11 +234,12 @@ Status ProcessHashTableProbe<JoinOpType, Parent>::do_process(HashTableType& hash { SCOPED_TIMER(_search_hashtable_timer); - auto [new_probe_idx, new_current_offset] = + auto [new_probe_idx, new_build_idx, new_current_offset] = hash_table_ctx.hash_table->template find_batch<JoinOpType>( hash_table_ctx.keys, hash_table_ctx.bucket_nums.data(), probe_index, - probe_rows, _probe_indexs.data(), _build_indexs.data()); + build_index, probe_rows, _probe_indexs.data(), _build_indexs.data()); probe_index = new_probe_idx; + build_index = new_build_idx; current_offset = new_current_offset; probe_size = probe_index - last_probe_index; } diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index 5633d606d01..7988cc598bd 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -386,6 +386,7 @@ private: bool _has_set_need_null_map_for_build = false; bool _probe_ignore_null = false; int _probe_index = -1; + uint32_t _build_index = 0; bool _ready_probe = false; bool _probe_eos = false; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org