This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch new_join2
in repository https://gitbox.apache.org/repos/asf/doris.git

commit f67d0b1635e8a8ac4e6ec406930987410f209e78
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Mon Nov 27 11:32:24 2023 +0800

    [performance] opt the join and other join conjuncts (#27604)
---
 be/src/vec/common/hash_table/hash_map.h            | 57 ++++++++++++----------
 .../vec/exec/join/process_hash_table_probe_impl.h  |  1 +
 2 files changed, 31 insertions(+), 27 deletions(-)

diff --git a/be/src/vec/common/hash_table/hash_map.h 
b/be/src/vec/common/hash_table/hash_map.h
index 39872e205b3..657f91d29ad 100644
--- a/be/src/vec/common/hash_table/hash_map.h
+++ b/be/src/vec/common/hash_table/hash_map.h
@@ -257,18 +257,19 @@ public:
     }
 
     template <int JoinOpType, bool with_other_conjuncts, bool is_mark_join, 
bool need_judge_null>
-    auto find_batch(const Key* __restrict keys, const uint32_t* __restrict 
bucket_nums,
+    auto find_batch(const Key* __restrict keys, const uint32_t* __restrict 
build_idx_map,
                     int probe_idx, uint32_t build_idx, int probe_rows,
                     uint32_t* __restrict probe_idxs, bool& probe_visited,
                     uint32_t* __restrict build_idxs,
                     doris::vectorized::ColumnFilterHelper* mark_column) {
         if constexpr (is_mark_join) {
             return _find_batch_mark<JoinOpType, with_other_conjuncts>(
-                    keys, bucket_nums, probe_idx, probe_rows, probe_idxs, 
build_idxs, mark_column);
+                    keys, build_idx_map, probe_idx, probe_rows, probe_idxs, 
build_idxs,
+                    mark_column);
         }
 
         if constexpr (with_other_conjuncts) {
-            return _find_batch_conjunct<JoinOpType>(keys, bucket_nums, 
probe_idx, build_idx,
+            return _find_batch_conjunct<JoinOpType>(keys, build_idx_map, 
probe_idx, build_idx,
                                                     probe_rows, probe_idxs, 
build_idxs);
         }
 
@@ -276,19 +277,19 @@ public:
                       JoinOpType == doris::TJoinOp::FULL_OUTER_JOIN ||
                       JoinOpType == doris::TJoinOp::LEFT_OUTER_JOIN ||
                       JoinOpType == doris::TJoinOp::RIGHT_OUTER_JOIN) {
-            return _find_batch_inner_outer_join<JoinOpType>(keys, bucket_nums, 
probe_idx, build_idx,
-                                                            probe_rows, 
probe_idxs, probe_visited,
-                                                            build_idxs);
+            return _find_batch_inner_outer_join<JoinOpType>(keys, 
build_idx_map, probe_idx,
+                                                            build_idx, 
probe_rows, probe_idxs,
+                                                            probe_visited, 
build_idxs);
         }
         if constexpr (JoinOpType == doris::TJoinOp::LEFT_ANTI_JOIN ||
                       JoinOpType == doris::TJoinOp::LEFT_SEMI_JOIN ||
                       JoinOpType == doris::TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) 
{
             return _find_batch_left_semi_anti<JoinOpType, need_judge_null>(
-                    keys, bucket_nums, probe_idx, probe_rows, probe_idxs);
+                    keys, build_idx_map, probe_idx, probe_rows, probe_idxs);
         }
         if constexpr (JoinOpType == doris::TJoinOp::RIGHT_ANTI_JOIN ||
                       JoinOpType == doris::TJoinOp::RIGHT_SEMI_JOIN) {
-            return _find_batch_right_semi_anti(keys, bucket_nums, probe_idx, 
probe_rows);
+            return _find_batch_right_semi_anti(keys, build_idx_map, probe_idx, 
probe_rows);
         }
         return std::tuple {0, 0U, 0};
     }
@@ -317,10 +318,16 @@ public:
 
     bool has_null_key() { return _has_null_key; }
 
+    void pre_build_idxs(std::vector<uint32>& bucksets) {
+        for (uint32_t i = 0; i < bucksets.size(); i++) {
+            bucksets[i] = first[bucksets[i]];
+        }
+    }
+
 private:
     // only LEFT_ANTI_JOIN/LEFT_SEMI_JOIN/NULL_AWARE_LEFT_ANTI_JOIN/CROSS_JOIN 
support mark join
     template <int JoinOpType, bool with_other_conjuncts>
-    auto _find_batch_mark(const Key* __restrict keys, const uint32_t* 
__restrict bucket_nums,
+    auto _find_batch_mark(const Key* __restrict keys, const uint32_t* 
__restrict build_idx_map,
                           int probe_idx, int probe_rows, uint32_t* __restrict 
probe_idxs,
                           uint32_t* __restrict build_idxs,
                           doris::vectorized::ColumnFilterHelper* mark_column) {
@@ -328,14 +335,14 @@ private:
         const auto batch_size = max_batch_size;
 
         while (probe_idx < probe_rows && matched_cnt < batch_size) {
-            auto build_idx = first[bucket_nums[probe_idx]];
+            auto build_idx = build_idx_map[probe_idx];
 
             while (build_idx && keys[probe_idx] != build_keys[build_idx]) {
                 build_idx = next[build_idx];
             }
 
             if constexpr (!with_other_conjuncts) {
-                if (bucket_nums[probe_idx] == bucket_size) {
+                if (!build_idx_map[probe_idx]) {
                     // mark result as null when probe row is null
                     mark_column->insert_null();
                 } else {
@@ -357,10 +364,10 @@ private:
     }
 
     auto _find_batch_right_semi_anti(const Key* __restrict keys,
-                                     const uint32_t* __restrict bucket_nums, 
int probe_idx,
+                                     const uint32_t* __restrict build_idx_map, 
int probe_idx,
                                      int probe_rows) {
         while (probe_idx < probe_rows) {
-            auto build_idx = first[bucket_nums[probe_idx]];
+            auto build_idx = build_idx_map[probe_idx];
 
             while (build_idx) {
                 if (!visited[build_idx] && keys[probe_idx] == 
build_keys[build_idx]) {
@@ -375,20 +382,20 @@ private:
 
     template <int JoinOpType, bool need_judge_null>
     auto _find_batch_left_semi_anti(const Key* __restrict keys,
-                                    const uint32_t* __restrict bucket_nums, 
int probe_idx,
+                                    const uint32_t* __restrict build_idx_map, 
int probe_idx,
                                     int probe_rows, uint32_t* __restrict 
probe_idxs) {
         auto matched_cnt = 0;
         const auto batch_size = max_batch_size;
 
         while (probe_idx < probe_rows && matched_cnt < batch_size) {
             if constexpr (need_judge_null) {
-                if (bucket_nums[probe_idx] == bucket_size) {
+                if (!build_idx_map[probe_idx]) {
                     probe_idx++;
                     continue;
                 }
             }
 
-            auto build_idx = first[bucket_nums[probe_idx]];
+            auto build_idx = build_idx_map[probe_idx];
 
             while (build_idx && keys[probe_idx] != build_keys[build_idx]) {
                 build_idx = next[build_idx];
@@ -402,31 +409,27 @@ private:
     }
 
     template <int JoinOpType>
-    auto _find_batch_conjunct(const Key* __restrict keys, const uint32_t* 
__restrict bucket_nums,
+    auto _find_batch_conjunct(const Key* __restrict keys, const uint32_t* 
__restrict build_idx_map,
                               int probe_idx, uint32_t build_idx, int 
probe_rows,
                               uint32_t* __restrict probe_idxs, uint32_t* 
__restrict build_idxs) {
         auto matched_cnt = 0;
         const auto batch_size = max_batch_size;
 
         auto do_the_probe = [&]() {
-            auto matched_cnt_old = matched_cnt;
             while (build_idx && matched_cnt < batch_size) {
                 if constexpr (JoinOpType == doris::TJoinOp::RIGHT_ANTI_JOIN ||
                               JoinOpType == doris::TJoinOp::RIGHT_SEMI_JOIN) {
                     if (!visited[build_idx] && keys[probe_idx] == 
build_keys[build_idx]) {
                         build_idxs[matched_cnt++] = build_idx;
                     }
-                } else {
+                } else if (keys[probe_idx] == build_keys[build_idx]) {
                     build_idxs[matched_cnt] = build_idx;
-                    matched_cnt += keys[probe_idx] == build_keys[build_idx];
+                    probe_idxs[matched_cnt] = probe_idx;
+                    matched_cnt++;
                 }
                 build_idx = next[build_idx];
             }
 
-            for (auto i = matched_cnt_old; i < matched_cnt; i++) {
-                probe_idxs[i] = probe_idx;
-            }
-
             if constexpr (JoinOpType == doris::TJoinOp::LEFT_OUTER_JOIN ||
                           JoinOpType == doris::TJoinOp::FULL_OUTER_JOIN ||
                           JoinOpType == doris::TJoinOp::LEFT_ANTI_JOIN ||
@@ -447,7 +450,7 @@ private:
         }
 
         while (probe_idx < probe_rows && matched_cnt < batch_size) {
-            build_idx = first[bucket_nums[probe_idx]];
+            build_idx = build_idx_map[probe_idx];
             do_the_probe();
         }
 
@@ -457,7 +460,7 @@ private:
 
     template <int JoinOpType>
     auto _find_batch_inner_outer_join(const Key* __restrict keys,
-                                      const uint32_t* __restrict bucket_nums, 
int probe_idx,
+                                      const uint32_t* __restrict 
build_idx_map, int probe_idx,
                                       uint32_t build_idx, int probe_rows,
                                       uint32_t* __restrict probe_idxs, bool& 
probe_visited,
                                       uint32_t* __restrict build_idxs) {
@@ -501,7 +504,7 @@ private:
         }
 
         while (probe_idx < probe_rows && matched_cnt < batch_size) {
-            build_idx = first[bucket_nums[probe_idx]];
+            build_idx = build_idx_map[probe_idx];
             do_the_probe();
         }
 
diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h 
b/be/src/vec/exec/join/process_hash_table_probe_impl.h
index d38189087c7..49c8b0dfbd7 100644
--- a/be/src/vec/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h
@@ -135,6 +135,7 @@ typename HashTableType::State 
ProcessHashTableProbe<JoinOpType, Parent>::_init_p
         hash_table_ctx.reset();
         hash_table_ctx.init_serialized_keys(_parent->_probe_columns, 
probe_rows, null_map, true,
                                             false, 
hash_table_ctx.hash_table->get_bucket_size());
+        hash_table_ctx.hash_table->pre_build_idxs(hash_table_ctx.bucket_nums);
     }
     return typename HashTableType::State(_parent->_probe_columns);
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to