This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch new_join
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/new_join by this push:
     new 707b7a4d671 opt for some join (#26513)
707b7a4d671 is described below

commit 707b7a4d6712d6b7c5cc993bab3f2a9c522e6930
Author: Pxl <pxl...@qq.com>
AuthorDate: Thu Nov 9 08:42:25 2023 +0800

    opt for some join (#26513)
---
 be/src/pipeline/exec/hashjoin_build_sink.cpp |  3 +-
 be/src/vec/columns/column_vector.cpp         | 13 +----
 be/src/vec/common/hash_table/hash_map.h      | 74 +++++++++++++++++++---------
 be/src/vec/exec/join/vhash_join_node.cpp     |  2 +-
 4 files changed, 56 insertions(+), 36 deletions(-)

diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index c7af4f89ba4..748246a8f27 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -483,7 +483,8 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
                                                 _build_expr_ctxs, 
_runtime_filter_descs);
 
                                 
RETURN_IF_ERROR(local_state._runtime_filter_slots->init(
-                                        state, arg.hash_table->size(), 0));
+                                        state, arg.hash_table->size(),
+                                        local_state._build_rf_cardinality));
                                 RETURN_IF_ERROR(
                                         
local_state._runtime_filter_slots->copy_from_shared_context(
                                                 _shared_hash_table_context));
diff --git a/be/src/vec/columns/column_vector.cpp 
b/be/src/vec/columns/column_vector.cpp
index 4575d089781..a825e07d5f2 100644
--- a/be/src/vec/columns/column_vector.cpp
+++ b/be/src/vec/columns/column_vector.cpp
@@ -397,17 +397,8 @@ void ColumnVector<T>::insert_indices_from_join(const 
IColumn& src, const uint32_
 
     const T* __restrict src_data = reinterpret_cast<const 
T*>(src.get_raw_data().data);
 
-    if constexpr (std::is_same_v<T, UInt8>) {
-        // nullmap : indices_begin[i] == 0 means is null at the here, set true 
here
-        for (uint32_t i = 0; i < new_size; ++i) {
-            data[origin_size + i] =
-                    (indices_begin[i] == 0) + (indices_begin[i] != 0) * 
src_data[indices_begin[i]];
-        }
-    } else {
-        // real data : indices_begin[i] == 0 what at is meaningless
-        for (uint32_t i = 0; i < new_size; ++i) {
-            data[origin_size + i] = src_data[indices_begin[i]];
-        }
+    for (uint32_t i = 0; i < new_size; ++i) {
+        data[origin_size + i] = src_data[indices_begin[i]];
     }
 }
 
diff --git a/be/src/vec/common/hash_table/hash_map.h 
b/be/src/vec/common/hash_table/hash_map.h
index a9506869875..d6c41bbc818 100644
--- a/be/src/vec/common/hash_table/hash_map.h
+++ b/be/src/vec/common/hash_table/hash_map.h
@@ -24,6 +24,7 @@
 
 #include <span>
 
+#include "common/compiler_util.h"
 #include "vec/common/hash_table/hash.h"
 #include "vec/common/hash_table/hash_table.h"
 #include "vec/common/hash_table/hash_table_allocator.h"
@@ -213,7 +214,7 @@ public:
     using HashMapTable<Key, Cell, Hash, Grower, Allocator>::HashMapTable;
 
     static uint32_t calc_bucket_size(size_t num_elem) {
-        size_t expect_bucket_size = static_cast<size_t>(num_elem) + (num_elem 
- 1) / 7;
+        size_t expect_bucket_size = num_elem + (num_elem - 1) / 7;
         return phmap::priv::NormalizeCapacity(expect_bucket_size) + 1;
     }
 
@@ -271,10 +272,14 @@ public:
         }
         if constexpr (JoinOpType == doris::TJoinOp::RIGHT_ANTI_JOIN ||
                       JoinOpType == doris::TJoinOp::RIGHT_SEMI_JOIN) {
-            return _find_batch_right_semi_anti<with_other_conjuncts>(
-                    keys, bucket_nums, probe_idx, probe_rows, probe_idxs, 
build_idxs);
+            if constexpr (!with_other_conjuncts) {
+                return _find_batch_right_semi_anti(keys, bucket_nums, 
probe_idx, probe_rows);
+            } else {
+                return _find_batch_right_semi_anti_conjunct(keys, bucket_nums, 
probe_idx, build_idx,
+                                                            probe_rows, 
probe_idxs, build_idxs);
+            }
         }
-        return std::tuple {0, 0u, 0};
+        return std::tuple {0, 0U, 0};
     }
 
     template <int JoinOpType>
@@ -300,30 +305,57 @@ public:
     }
 
 private:
-    template <bool with_other_conjuncts>
     auto _find_batch_right_semi_anti(const Key* __restrict keys,
                                      const uint32_t* __restrict bucket_nums, 
int probe_idx,
-                                     int probe_rows, uint32_t* __restrict 
probe_idxs,
-                                     uint32_t* __restrict build_idxs) {
+                                     int probe_rows) {
         auto matched_cnt = 0;
         while (probe_idx < probe_rows) {
             auto build_idx = first[bucket_nums[probe_idx]];
 
             while (build_idx) {
+                if (!visited[build_idx] && keys[probe_idx] == 
build_keys[build_idx]) {
+                    visited[build_idx] = 1;
+                }
+                build_idx = next[build_idx];
+            }
+            probe_idx++;
+        }
+        return std::tuple {probe_idx, 0U, matched_cnt};
+    }
+
+    auto _find_batch_right_semi_anti_conjunct(const Key* __restrict keys,
+                                              const uint32_t* __restrict 
bucket_nums, int probe_idx,
+                                              uint32_t build_idx, int 
probe_rows,
+                                              uint32_t* __restrict probe_idxs,
+                                              uint32_t* __restrict build_idxs) 
{
+        auto matched_cnt = 0;
+        const auto batch_size = max_batch_size;
+
+        auto do_the_probe = [&]() {
+            auto matched_cnt_old = matched_cnt;
+            while (build_idx && matched_cnt < batch_size) {
                 if (keys[probe_idx] == build_keys[build_idx]) {
-                    if constexpr (with_other_conjuncts) {
-                        build_idxs[matched_cnt] = build_idx;
-                        probe_idxs[matched_cnt] = probe_idx;
-                        matched_cnt++;
-                    } else {
-                        visited[build_idx] = 1;
-                    }
+                    build_idxs[matched_cnt++] = build_idx;
                 }
                 build_idx = next[build_idx];
             }
+            for (auto i = matched_cnt_old; i < matched_cnt; i++) {
+                probe_idxs[i] = probe_idx;
+            }
             probe_idx++;
+        };
+
+        if (build_idx) {
+            do_the_probe();
+        }
+
+        while (probe_idx < probe_rows && matched_cnt < batch_size) {
+            build_idx = first[bucket_nums[probe_idx]];
+            do_the_probe();
         }
-        return std::tuple {probe_idx, 0u, matched_cnt};
+
+        probe_idx -= (matched_cnt == batch_size && build_idx);
+        return std::tuple {probe_idx, build_idx, matched_cnt};
     }
 
     template <int JoinOpType>
@@ -334,21 +366,17 @@ private:
         const auto batch_size = max_batch_size;
 
         while (probe_idx < probe_rows && matched_cnt < batch_size) {
-            uint32_t bucket_num = bucket_nums[probe_idx];
-            auto build_idx = first[bucket_num];
+            auto build_idx = first[bucket_nums[probe_idx]];
 
-            while (build_idx) {
-                if (keys[probe_idx] == build_keys[build_idx]) {
-                    break;
-                }
+            while (build_idx && keys[probe_idx] != build_keys[build_idx]) {
                 build_idx = next[build_idx];
             }
             bool matched =
                     JoinOpType == doris::TJoinOp::LEFT_SEMI_JOIN ? build_idx 
!= 0 : build_idx == 0;
+            probe_idxs[matched_cnt] = probe_idx++;
             matched_cnt += matched;
-            probe_idxs[matched_cnt - matched] = probe_idx++;
         }
-        return std::tuple {probe_idx, 0u, matched_cnt};
+        return std::tuple {probe_idx, 0U, matched_cnt};
     }
 
     template <int JoinOpType, bool with_other_conjuncts>
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index 4d0ed465400..dc13bb44ee4 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -816,7 +816,7 @@ Status HashJoinNode::sink(doris::RuntimeState* state, 
vectorized::Block* in_bloc
                                           _build_expr_ctxs, 
_runtime_filter_descs);
 
                                   RETURN_IF_ERROR(_runtime_filter_slots->init(
-                                          state, arg.hash_table->size(), 0));
+                                          state, arg.hash_table->size(), 
_build_rf_cardinality));
                                   
RETURN_IF_ERROR(_runtime_filter_slots->copy_from_shared_context(
                                           _shared_hash_table_context));
                                   
RETURN_IF_ERROR(_runtime_filter_slots->publish());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to