This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch new_join
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 76592a3ff8ba4e981d4dd2057a5a8408f629b5ab
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Tue Oct 31 15:33:37 2023 +0800

    fix bug in shared hash table (#26158)
---
 be/src/pipeline/exec/hashjoin_probe_operator.h     |  1 +
 be/src/vec/common/hash_table/hash_map.h            | 34 ++++++++--------------
 .../vec/exec/join/process_hash_table_probe_impl.h  |  6 ++--
 be/src/vec/exec/join/vhash_join_node.h             |  1 +
 4 files changed, 18 insertions(+), 24 deletions(-)

diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h 
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index 181934e7b50..31a3bd38b93 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -99,6 +99,7 @@ private:
     friend struct vectorized::ProcessHashTableProbe;
 
     int _probe_index = -1;
+    uint32_t _build_index = 0;
     bool _ready_probe = false;
     bool _probe_eos = false;
     std::atomic<bool> _probe_inited = false;
diff --git a/be/src/vec/common/hash_table/hash_map.h 
b/be/src/vec/common/hash_table/hash_map.h
index 07528b857fa..cafe01e8231 100644
--- a/be/src/vec/common/hash_table/hash_map.h
+++ b/be/src/vec/common/hash_table/hash_map.h
@@ -248,13 +248,13 @@ public:
 
     template <int JoinOpType>
     auto find_batch(const Key* __restrict keys, const uint32_t* __restrict 
bucket_nums,
-                    int probe_idx, int probe_rows, uint32_t* __restrict 
probe_idxs,
-                    uint32_t* __restrict build_idxs) {
+                    int probe_idx, uint32_t build_idx, int probe_rows,
+                    uint32_t* __restrict probe_idxs, uint32_t* __restrict 
build_idxs) {
         if constexpr (JoinOpType == doris::TJoinOp::INNER_JOIN ||
                       JoinOpType == doris::TJoinOp::FULL_OUTER_JOIN ||
                       JoinOpType == doris::TJoinOp::LEFT_OUTER_JOIN ||
                       JoinOpType == doris::TJoinOp::RIGHT_OUTER_JOIN) {
-            return _find_batch_inner_outer_join<JoinOpType>(keys, bucket_nums, 
probe_idx,
+            return _find_batch_inner_outer_join<JoinOpType>(keys, bucket_nums, 
probe_idx, build_idx,
                                                             probe_rows, 
probe_idxs, build_idxs);
         }
         if constexpr (JoinOpType == doris::TJoinOp::LEFT_ANTI_JOIN ||
@@ -266,7 +266,7 @@ public:
                       JoinOpType == doris::TJoinOp::RIGHT_SEMI_JOIN) {
             return _find_batch_right_semi_anti(keys, bucket_nums, probe_idx, 
probe_rows);
         }
-        return std::pair {0, 0};
+        return std::tuple {0, 0u, 0};
     }
 
     template <int JoinOpType>
@@ -295,7 +295,7 @@ private:
     auto _find_batch_right_semi_anti(const Key* __restrict keys,
                                      const uint32_t* __restrict bucket_nums, 
int probe_idx,
                                      int probe_rows) {
-        while (LIKELY(probe_idx < probe_rows)) {
+        while (probe_idx < probe_rows) {
             auto build_idx = first[bucket_nums[probe_idx]];
 
             while (build_idx) {
@@ -306,7 +306,7 @@ private:
             }
             probe_idx++;
         }
-        return std::pair {probe_idx, 0};
+        return std::tuple {probe_idx, 0u, 0};
     }
 
     template <int JoinOpType>
@@ -331,17 +331,17 @@ private:
             matched_cnt += matched;
             probe_idxs[matched_cnt - matched] = probe_idx++;
         }
-        return std::pair {probe_idx, matched_cnt};
+        return std::tuple {probe_idx, 0u, matched_cnt};
     }
 
     template <int JoinOpType>
     auto _find_batch_inner_outer_join(const Key* __restrict keys,
                                       const uint32_t* __restrict bucket_nums, 
int probe_idx,
-                                      int probe_rows, uint32_t* __restrict 
probe_idxs,
+                                      uint32_t build_idx, int probe_rows,
+                                      uint32_t* __restrict probe_idxs,
                                       uint32_t* __restrict build_idxs) {
         auto matched_cnt = 0;
         const auto batch_size = max_batch_size;
-        size_t build_idx = 0;
 
         auto do_the_probe = [&]() {
             while (build_idx && matched_cnt < batch_size) {
@@ -369,10 +369,7 @@ private:
             probe_idx++;
         };
 
-        if (probe_idx == current_probe_idx) {
-            current_probe_idx = -1;
-            build_idx = current_build_idx;
-            current_build_idx = 0;
+        if (build_idx) {
             do_the_probe();
         }
 
@@ -382,12 +379,8 @@ private:
             do_the_probe();
         }
 
-        if (matched_cnt == batch_size && build_idx) {
-            probe_idx--;
-            current_probe_idx = probe_idx;
-            current_build_idx = build_idx;
-        }
-        return std::pair {probe_idx, matched_cnt};
+        probe_idx -= (matched_cnt == batch_size && build_idx);
+        return std::tuple {probe_idx, build_idx, matched_cnt};
     }
 
     const Key* __restrict build_keys;
@@ -396,9 +389,6 @@ private:
     uint32_t bucket_size = 0;
     int max_batch_size = 0;
 
-    int current_probe_idx = -1;
-    uint32_t current_build_idx = 0;
-
     std::vector<uint32_t> first;
     std::vector<uint32_t> next;
 
diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h 
b/be/src/vec/exec/join/process_hash_table_probe_impl.h
index 248e8f42328..6a21086f50e 100644
--- a/be/src/vec/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h
@@ -193,6 +193,7 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::do_process(HashTableType& hash
                                                              Block* 
output_block,
                                                              size_t 
probe_rows) {
     auto& probe_index = _parent->_probe_index;
+    auto& build_index = _parent->_build_index;
 
     using Mapped = typename HashTableType::Mapped;
 
@@ -233,11 +234,12 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::do_process(HashTableType& hash
 
     {
         SCOPED_TIMER(_search_hashtable_timer);
-        auto [new_probe_idx, new_current_offset] =
+        auto [new_probe_idx, new_build_idx, new_current_offset] =
                 hash_table_ctx.hash_table->template find_batch<JoinOpType>(
                         hash_table_ctx.keys, 
hash_table_ctx.bucket_nums.data(), probe_index,
-                        probe_rows, _probe_indexs.data(), 
_build_indexs.data());
+                        build_index, probe_rows, _probe_indexs.data(), 
_build_indexs.data());
         probe_index = new_probe_idx;
+        build_index = new_build_idx;
         current_offset = new_current_offset;
         probe_size = probe_index - last_probe_index;
     }
diff --git a/be/src/vec/exec/join/vhash_join_node.h 
b/be/src/vec/exec/join/vhash_join_node.h
index 5633d606d01..7988cc598bd 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -386,6 +386,7 @@ private:
     bool _has_set_need_null_map_for_build = false;
     bool _probe_ignore_null = false;
     int _probe_index = -1;
+    uint32_t _build_index = 0;
     bool _ready_probe = false;
     bool _probe_eos = false;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to