github-actions[bot] commented on code in PR #24461:
URL: https://github.com/apache/doris/pull/24461#discussion_r1333926608


##########
be/src/vec/common/columns_hashing.h:
##########
@@ -110,6 +107,14 @@
         }
     }
 
+    std::vector<StringRef> get_keys(size_t rows_number) const {
+        std::vector<StringRef> keys(rows_number);

Review Comment:
   warning: variable 'keys' is not initialized 
[cppcoreguidelines-init-variables]
   
   ```suggestion
           std::vector<StringRef> keys = 0(rows_number);
   ```
   



##########
be/src/vec/common/columns_hashing.h:
##########
@@ -110,6 +107,14 @@ struct HashMethodString : public 
columns_hashing_impl::HashMethodBase<
         }
     }
 
+    std::vector<StringRef> get_keys(size_t rows_number) const {

Review Comment:
   warning: function 'get_keys' should be marked [[nodiscard]] 
[modernize-use-nodiscard]
   
   ```suggestion
       [[nodiscard]] std::vector<StringRef> get_keys(size_t rows_number) const {
   ```
   



##########
be/src/vec/exec/join/process_hash_table_probe_impl.h:
##########
@@ -206,884 +205,655 @@ void 
ProcessHashTableProbe<JoinOpType>::_pre_serialize_key(
     }
 }
 
+template <int JoinOpType>
+template <typename KeyGetter>
+KeyGetter ProcessHashTableProbe<JoinOpType>::_init_probe_side(size_t 
probe_rows,
+                                                              bool 
with_other_join_conjuncts) {
+    _right_col_idx = _join_context->_is_right_semi_anti && 
!with_other_join_conjuncts
+                             ? 0
+                             : _join_context->_left_table_data_types->size();
+    _right_col_len = _join_context->_right_table_data_types->size();
+    _row_count_from_last_probe = 0;
+
+    _probe_indexs.clear();
+    _build_block_rows.clear();
+    _build_block_offsets.clear();
+    _visited_map.clear();
+    _same_to_prev.clear();
+    if (_build_block_rows.capacity() < probe_rows * PROBE_SIDE_EXPLODE_RATE) {
+        if (with_other_join_conjuncts) {
+            _probe_indexs.reserve(probe_rows * PROBE_SIDE_EXPLODE_RATE);
+        }
+        _build_block_rows.reserve(probe_rows * PROBE_SIDE_EXPLODE_RATE);
+        _build_block_offsets.reserve(probe_rows * PROBE_SIDE_EXPLODE_RATE);
+    }
+
+    // use in right join to change visited state after
+    // exec the vother join conjunct
+    if (with_other_join_conjuncts) {
+        _visited_map.reserve(_batch_size * VISITED_MAP_EXPLODE_RATE);
+        _same_to_prev.reserve(_batch_size * VISITED_MAP_EXPLODE_RATE);
+    }
+
+    KeyGetter key_getter(*_join_context->_probe_columns, 
_join_context->_probe_key_sz, nullptr);
+
+    if constexpr 
(ColumnsHashing::IsPreSerializedKeysHashMethodTraits<KeyGetter>::value) {
+        if (*_join_context->_probe_index == 0) {
+            _pre_serialize_key(*_join_context->_probe_columns, probe_rows, 
_probe_keys);
+        }
+        key_getter.set_serialized_keys(_probe_keys.data());
+    }
+
+    return key_getter;
+}
+
 template <int JoinOpType>
 template <bool need_null_map_for_probe, bool ignore_null, typename 
HashTableType>
+void ProcessHashTableProbe<JoinOpType>::_probe_hash(size_t probe_rows,
+                                                    HashTableType& 
hash_table_ctx,
+                                                    HashTableType::State 
key_getter,
+                                                    ConstNullMapPtr null_map) {
+    if (*(_join_context->_ready_probe_index) >= probe_rows) {
+        return;
+    }
+    _probe_side_hash_values.resize(probe_rows);
+    auto keys = key_getter.get_keys(probe_rows);
+    for (size_t k = *(_join_context->_ready_probe_index); k < probe_rows; ++k) 
{
+        if constexpr (ignore_null && need_null_map_for_probe) {
+            if ((*null_map)[k]) {
+                continue;
+            }
+        }
+        _probe_side_hash_values[k] = hash_table_ctx.hash_table.hash(keys[k]);
+    }
+
+    using FindResult = decltype(key_getter.find_key(hash_table_ctx.hash_table, 
0, *_arena));
+    _probe_side_find_result.resize(sizeof(FindResult) * probe_rows);
+    auto find_result_data = (FindResult*)_probe_side_find_result.data();
+
+    FindResult empty = {nullptr, false};

Review Comment:
   warning: variable 'empty' is not initialized 
[cppcoreguidelines-init-variables]
   
   ```suggestion
       FindResult empty = 0 = {nullptr, false};
   ```
   



##########
be/src/vec/exec/join/process_hash_table_probe_impl.h:
##########
@@ -206,884 +205,655 @@
     }
 }
 
+template <int JoinOpType>
+template <typename KeyGetter>
+KeyGetter ProcessHashTableProbe<JoinOpType>::_init_probe_side(size_t 
probe_rows,
+                                                              bool 
with_other_join_conjuncts) {
+    _right_col_idx = _join_context->_is_right_semi_anti && 
!with_other_join_conjuncts
+                             ? 0
+                             : _join_context->_left_table_data_types->size();
+    _right_col_len = _join_context->_right_table_data_types->size();
+    _row_count_from_last_probe = 0;
+
+    _probe_indexs.clear();
+    _build_block_rows.clear();
+    _build_block_offsets.clear();
+    _visited_map.clear();
+    _same_to_prev.clear();
+    if (_build_block_rows.capacity() < probe_rows * PROBE_SIDE_EXPLODE_RATE) {
+        if (with_other_join_conjuncts) {
+            _probe_indexs.reserve(probe_rows * PROBE_SIDE_EXPLODE_RATE);
+        }
+        _build_block_rows.reserve(probe_rows * PROBE_SIDE_EXPLODE_RATE);
+        _build_block_offsets.reserve(probe_rows * PROBE_SIDE_EXPLODE_RATE);
+    }
+
+    // use in right join to change visited state after
+    // exec the vother join conjunct
+    if (with_other_join_conjuncts) {
+        _visited_map.reserve(_batch_size * VISITED_MAP_EXPLODE_RATE);
+        _same_to_prev.reserve(_batch_size * VISITED_MAP_EXPLODE_RATE);
+    }
+
+    KeyGetter key_getter(*_join_context->_probe_columns, 
_join_context->_probe_key_sz, nullptr);
+
+    if constexpr 
(ColumnsHashing::IsPreSerializedKeysHashMethodTraits<KeyGetter>::value) {
+        if (*_join_context->_probe_index == 0) {
+            _pre_serialize_key(*_join_context->_probe_columns, probe_rows, 
_probe_keys);
+        }
+        key_getter.set_serialized_keys(_probe_keys.data());
+    }
+
+    return key_getter;
+}
+
 template <int JoinOpType>
 template <bool need_null_map_for_probe, bool ignore_null, typename 
HashTableType>
+void ProcessHashTableProbe<JoinOpType>::_probe_hash(size_t probe_rows,
+                                                    HashTableType& 
hash_table_ctx,
+                                                    HashTableType::State 
key_getter,
+                                                    ConstNullMapPtr null_map) {
+    if (*(_join_context->_ready_probe_index) >= probe_rows) {
+        return;
+    }
+    _probe_side_hash_values.resize(probe_rows);
+    auto keys = key_getter.get_keys(probe_rows);
+    for (size_t k = *(_join_context->_ready_probe_index); k < probe_rows; ++k) 
{
+        if constexpr (ignore_null && need_null_map_for_probe) {
+            if ((*null_map)[k]) {
+                continue;
+            }
+        }
+        _probe_side_hash_values[k] = hash_table_ctx.hash_table.hash(keys[k]);
+    }
+
+    using FindResult = decltype(key_getter.find_key(hash_table_ctx.hash_table, 
0, *_arena));
+    _probe_side_find_result.resize(sizeof(FindResult) * probe_rows);
+    auto find_result_data = (FindResult*)_probe_side_find_result.data();
+
+    FindResult empty = {nullptr, false};
+    for (size_t k = *(_join_context->_ready_probe_index); k < probe_rows; ++k) 
{
+        if constexpr (ignore_null && need_null_map_for_probe) {
+            if ((*null_map)[k]) {
+                continue;
+            }
+        }
+        find_result_data[k] =
+                (need_null_map_for_probe && (*null_map)[k])
+                        ? empty
+                        : 
key_getter.find_key_with_hash(hash_table_ctx.hash_table,
+                                                        
_probe_side_hash_values[k], keys[k]);
+
+        if (LIKELY(k + HASH_MAP_PREFETCH_DIST < probe_rows)) {
+            if constexpr (ignore_null && need_null_map_for_probe) {
+                if ((*null_map)[k + HASH_MAP_PREFETCH_DIST]) {
+                    continue;
+                }
+            }
+            key_getter.template prefetch_by_hash<true>(
+                    hash_table_ctx.hash_table, _probe_side_hash_values[k + 
HASH_MAP_PREFETCH_DIST]);
+        }
+    }
+    *(_join_context->_ready_probe_index) = probe_rows;
+}
+
+template <int JoinOpType>
+template <typename Mapped, bool with_other_join_conjuncts>
+ForwardIterator<Mapped>& 
ProcessHashTableProbe<JoinOpType>::_probe_row_match(int& current_offset,
+                                                                             
int& probe_index,
+                                                                             
size_t& probe_size,
+                                                                             
bool& all_match_one) {
+    auto& probe_row_match_iter =
+            
std::get<ForwardIterator<Mapped>>(*_join_context->_probe_row_match_iter);
+    if (!probe_row_match_iter.ok()) {
+        return probe_row_match_iter;
+    }
+
+    SCOPED_TIMER(_search_hashtable_timer);
+    for (; probe_row_match_iter.ok() && current_offset < _batch_size; 
++probe_row_match_iter) {
+        _emplace_element(probe_row_match_iter->block_offset, 
probe_row_match_iter->row_num,
+                         current_offset);
+        _probe_indexs.template emplace_back(probe_index);
+        if constexpr (with_other_join_conjuncts) {
+            _visited_map.emplace_back(&probe_row_match_iter->visited);
+        }
+    }
+
+    _row_count_from_last_probe = current_offset;
+    all_match_one &= (current_offset == 1);
+    if (!probe_row_match_iter.ok()) {
+        ++probe_index;
+    }
+    probe_size = 1;
+
+    return probe_row_match_iter;
+}
+
+template <int JoinOpType>
+void ProcessHashTableProbe<JoinOpType>::_emplace_element(int8_t block_offset, 
int32_t block_row,
+                                                         int& current_offset) {
+    _build_block_offsets.emplace_back(block_offset);
+    _build_block_rows.emplace_back(block_row);
+    current_offset++;
+}
+
+template <int JoinOpType>
+template <bool need_null_map_for_probe, bool ignore_null, typename 
HashTableType,
+          bool with_other_conjuncts, bool is_mark_join>
 Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& 
hash_table_ctx,
                                                      ConstNullMapPtr null_map,
                                                      MutableBlock& 
mutable_block,
-                                                     Block* output_block, 
size_t probe_rows,
-                                                     bool is_mark_join) {
+                                                     Block* output_block, 
size_t probe_rows) {
     auto& probe_index = *_join_context->_probe_index;
-    auto& probe_raw_ptrs = *_join_context->_probe_columns;
 
-    _probe_indexs.resize(_batch_size);
-    if (_build_block_rows.size() < probe_rows * PROBE_SIDE_EXPLODE_RATE) {
-        _build_block_rows.resize(probe_rows * PROBE_SIDE_EXPLODE_RATE);
-        _build_block_offsets.resize(probe_rows * PROBE_SIDE_EXPLODE_RATE);
-    }
     using KeyGetter = typename HashTableType::State;
     using Mapped = typename HashTableType::Mapped;
 
-    int right_col_idx =
-            _join_context->_is_right_semi_anti ? 0 : 
_join_context->_left_table_data_types->size();
-    int right_col_len = _join_context->_right_table_data_types->size();
-
-    KeyGetter key_getter(probe_raw_ptrs, _join_context->_probe_key_sz, 
nullptr);
-
-    if constexpr 
(ColumnsHashing::IsPreSerializedKeysHashMethodTraits<KeyGetter>::value) {
-        if (probe_index == 0) {
-            _pre_serialize_key(probe_raw_ptrs, probe_rows, _probe_keys);
-        }
-        key_getter.set_serialized_keys(_probe_keys.data());
-    }
+    KeyGetter key_getter = _init_probe_side<KeyGetter>(probe_rows, 
with_other_conjuncts);
 
     auto& mcol = mutable_block.mutable_columns();
-    int current_offset = 0;
 
     constexpr auto is_right_semi_anti_join =
             JoinOpType == TJoinOp::RIGHT_ANTI_JOIN || JoinOpType == 
TJoinOp::RIGHT_SEMI_JOIN;
 
     constexpr auto probe_all =
             JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == 
TJoinOp::FULL_OUTER_JOIN;
 
-    bool all_match_one = true;
     int last_probe_index = probe_index;
+
+    int current_offset = 0;
+    bool all_match_one = true;
     size_t probe_size = 0;
-    auto& probe_row_match_iter =
-            
std::get<ForwardIterator<Mapped>>(*_join_context->_probe_row_match_iter);
-    {
-        SCOPED_TIMER(_search_hashtable_timer);
-        if constexpr (!is_right_semi_anti_join) {
-            // handle ramaining matched rows from last probe row
-            if (probe_row_match_iter.ok()) {
-                for (; probe_row_match_iter.ok() && current_offset < 
_batch_size;
-                     ++probe_row_match_iter) {
-                    if (LIKELY(current_offset < _build_block_rows.size())) {
-                        _build_block_offsets[current_offset] = 
probe_row_match_iter->block_offset;
-                        _build_block_rows[current_offset] = 
probe_row_match_iter->row_num;
-                        _probe_indexs[current_offset] = probe_index;
-                    } else {
-                        
_build_block_offsets.emplace_back(probe_row_match_iter->block_offset);
-                        
_build_block_rows.emplace_back(probe_row_match_iter->row_num);
-                        _probe_indexs.template emplace_back(probe_index);
-                    }
-                    ++current_offset;
-                }
-                all_match_one &= (current_offset == 1);
-                if (!probe_row_match_iter.ok()) {
-                    ++probe_index;
-                }
-                probe_size = 1;
-            }
+    auto& probe_row_match_iter = _probe_row_match<Mapped, 
with_other_conjuncts>(
+            current_offset, probe_index, probe_size, all_match_one);
+
+    bool is_the_last_sub_block = false;
+    if (with_other_conjuncts && probe_size != 0) {
+        is_the_last_sub_block = !probe_row_match_iter.ok();
+        _same_to_prev.emplace_back(false);
+        for (int i = 0; i < current_offset - 1; ++i) {
+            _same_to_prev.emplace_back(true);
         }
+    }
 
-        auto empty = decltype(key_getter.find_key(hash_table_ctx.hash_table, 
probe_index,
-                                                  *_arena)) {nullptr, false};
+    int multi_matched_output_row_count = 0;
+    _probe_hash<need_null_map_for_probe, ignore_null, 
HashTableType>(probe_rows, hash_table_ctx,
+                                                                     
key_getter, null_map);
 
-        if (current_offset < _batch_size) {
-            if (*(_join_context->_ready_probe_index) < probe_rows) {
-                _probe_side_hash_values.resize(probe_rows);
-                for (size_t k = *(_join_context->_ready_probe_index); k < 
probe_rows; ++k) {
-                    if constexpr (ignore_null && need_null_map_for_probe) {
-                        if ((*null_map)[k]) {
-                            continue;
-                        }
+    using FindResult = decltype(key_getter.find_key(hash_table_ctx.hash_table, 
0, *_arena));
+    auto find_result_data = (FindResult*)_probe_side_find_result.data();
+
+    while (current_offset < _batch_size && probe_index < probe_rows) {
+        if constexpr (ignore_null && need_null_map_for_probe) {
+            if ((*null_map)[probe_index]) {
+                if constexpr (probe_all) {
+                    // only full outer / left outer need insert the data of 
right table
+                    _emplace_element(-1, -1, current_offset);
+                    _probe_indexs.template emplace_back(probe_index);
+
+                    if constexpr (with_other_conjuncts) {
+                        _same_to_prev.emplace_back(false);
+                        _visited_map.emplace_back(nullptr);
                     }
-                    if constexpr 
(ColumnsHashing::IsPreSerializedKeysHashMethodTraits<
-                                          KeyGetter>::value) {
-                        _probe_side_hash_values[k] = 
hash_table_ctx.hash_table.hash(
-                                key_getter.get_key_holder(k, *_arena).key);
-                    } else {
-                        _probe_side_hash_values[k] = 
hash_table_ctx.hash_table.hash(
-                                key_getter.get_key_holder(k, *_arena));
+                }
+                probe_index++;
+                all_match_one = false;
+                if constexpr (probe_all) {
+                    if (current_offset >= _batch_size) {
+                        break;
                     }
                 }
-                *(_join_context->_ready_probe_index) = probe_rows;
+                continue;
             }
-            while (probe_index < probe_rows) {
-                if constexpr (ignore_null && need_null_map_for_probe) {
-                    if ((*null_map)[probe_index]) {
-                        if constexpr (probe_all) {
-                            // only full outer / left outer need insert the 
data of right table
-                            if (LIKELY(current_offset < 
_build_block_rows.size())) {
-                                _build_block_offsets[current_offset] = -1;
-                                _build_block_rows[current_offset] = -1;
-                                _probe_indexs[current_offset] = probe_index;
-                            } else {
-                                _build_block_offsets.emplace_back(-1);
-                                _build_block_rows.emplace_back(-1);
-                                _probe_indexs.template 
emplace_back(probe_index);
-                            }
-                            ++current_offset;
-                        }
-                        probe_index++;
-                        all_match_one = false;
-                        if constexpr (probe_all) {
-                            if (current_offset >= _batch_size) {
-                                break;
-                            }
-                        }
-                        continue;
+        }
+
+        const auto& find_result = find_result_data[probe_index];
+        int last_offset = current_offset;
+        auto current_probe_index = probe_index;
+        if constexpr (!with_other_conjuncts && (JoinOpType == 
TJoinOp::LEFT_ANTI_JOIN ||
+                                                JoinOpType == 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
+                                                JoinOpType == 
TJoinOp::LEFT_SEMI_JOIN)) {
+            bool need_go_ahead = (JoinOpType != TJoinOp::LEFT_SEMI_JOIN) ^ 
find_result.is_found();

Review Comment:
   warning: variable 'need_go_ahead' is not initialized 
[cppcoreguidelines-init-variables]
   
   ```suggestion
               bool need_go_ahead = false = (JoinOpType != 
TJoinOp::LEFT_SEMI_JOIN) ^ find_result.is_found();
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to