This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new bf16228851 [fix](hashjoin) join produce blocks with rows larger than 
batch size (#16166)
bf16228851 is described below

commit bf16228851c569259f2a8d10cd91aa4018199848
Author: TengJianPing <18241664+jackte...@users.noreply.github.com>
AuthorDate: Wed Feb 1 16:02:31 2023 +0800

    [fix](hashjoin) join produce blocks with rows larger than batch size 
(#16166)
    
    * [fix](hashjoin) join produce blocks with rows larger than batch size
    
    * fix
---
 .../vec/exec/join/process_hash_table_probe_impl.h  | 241 +++++++++++++--------
 be/src/vec/exec/join/vhash_join_node.cpp           |   2 +
 be/src/vec/exec/join/vhash_join_node.h             |   6 +
 3 files changed, 154 insertions(+), 95 deletions(-)

diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h 
b/be/src/vec/exec/join/process_hash_table_probe_impl.h
index 096a54ed43..238a3de0b4 100644
--- a/be/src/vec/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h
@@ -130,7 +130,6 @@ void 
ProcessHashTableProbe<JoinOpType>::probe_side_output_column(
         if (output_slot_flags[i]) {
             auto& column = probe_block.get_by_position(i).column;
             if (all_match_one) {
-                DCHECK_EQ(probe_size, column->size() - last_probe_index);
                 mcol[i]->insert_range_from(*column, last_probe_index, 
probe_size);
             } else {
                 DCHECK_GE(_items_counts.size(), last_probe_index + probe_size);
@@ -209,124 +208,177 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
 
     bool all_match_one = true;
     int last_probe_index = probe_index;
+    size_t probe_size = 0;
+    auto& probe_row_match_iter =
+            
std::get<ForwardIterator<Mapped>>(_join_node->_probe_row_match_iter);
     {
         SCOPED_TIMER(_search_hashtable_timer);
-        while (probe_index < probe_rows) {
-            if constexpr (ignore_null && need_null_map_for_probe) {
-                if ((*null_map)[probe_index]) {
-                    if constexpr (probe_all) {
-                        _items_counts[probe_index++] = (uint32_t)1;
-                        // only full outer / left outer need insert the data 
of right table
-                        if (LIKELY(current_offset < _build_block_rows.size())) 
{
-                            _build_block_offsets[current_offset] = -1;
-                            _build_block_rows[current_offset] = -1;
-                        } else {
-                            _build_block_offsets.emplace_back(-1);
-                            _build_block_rows.emplace_back(-1);
-                        }
-                        ++current_offset;
+        if constexpr (!is_right_semi_anti_join) {
+            // handle ramaining matched rows from last probe row
+            if (probe_row_match_iter.ok()) {
+                for (; probe_row_match_iter.ok() && current_offset < 
_batch_size;
+                     ++probe_row_match_iter) {
+                    if (LIKELY(current_offset < _build_block_rows.size())) {
+                        _build_block_offsets[current_offset] = 
probe_row_match_iter->block_offset;
+                        _build_block_rows[current_offset] = 
probe_row_match_iter->row_num;
                     } else {
-                        _items_counts[probe_index++] = (uint32_t)0;
+                        
_build_block_offsets.emplace_back(probe_row_match_iter->block_offset);
+                        
_build_block_rows.emplace_back(probe_row_match_iter->row_num);
                     }
-                    all_match_one = false;
-                    continue;
-                }
-            }
-            int last_offset = current_offset;
-            auto find_result =
-                    !need_null_map_for_probe
-                            ? key_getter.find_key(hash_table_ctx.hash_table, 
probe_index, *_arena)
-                    : (*null_map)[probe_index]
-                            ? 
decltype(key_getter.find_key(hash_table_ctx.hash_table, probe_index,
-                                                           *_arena)) {nullptr, 
false}
-                            : key_getter.find_key(hash_table_ctx.hash_table, 
probe_index, *_arena);
-            if (probe_index + PREFETCH_STEP < probe_rows) {
-                key_getter.template prefetch<true>(hash_table_ctx.hash_table,
-                                                   probe_index + 
PREFETCH_STEP, *_arena);
-            }
-
-            if constexpr (JoinOpType == TJoinOp::LEFT_ANTI_JOIN ||
-                          JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
-                if (is_mark_join) {
                     ++current_offset;
-                    
assert_cast<doris::vectorized::ColumnVector<UInt8>&>(*mcol[mcol.size() - 1])
-                            .get_data()
-                            .template push_back(!find_result.is_found());
-                } else {
-                    if (!find_result.is_found()) {
-                        ++current_offset;
-                    }
                 }
-            } else if constexpr (JoinOpType == TJoinOp::LEFT_SEMI_JOIN) {
-                if (is_mark_join) {
-                    ++current_offset;
-                    
assert_cast<doris::vectorized::ColumnVector<UInt8>&>(*mcol[mcol.size() - 1])
-                            .get_data()
-                            .template push_back(find_result.is_found());
-                } else {
-                    if (find_result.is_found()) {
-                        ++current_offset;
-                    }
+                _items_counts[probe_index] = current_offset;
+                all_match_one &= (current_offset == 1);
+                if (!probe_row_match_iter.ok()) {
+                    ++probe_index;
                 }
-            } else {
-                DCHECK(!is_mark_join);
-                if (find_result.is_found()) {
-                    auto& mapped = find_result.get_mapped();
-                    // TODO: Iterators are currently considered to be a heavy 
operation and have a certain impact on performance.
-                    // We should rethink whether to use this iterator mode in 
the future. Now just opt the one row case
-                    if (mapped.get_row_count() == 1) {
-                        if constexpr (std::is_same_v<Mapped, 
RowRefListWithFlag>) {
-                            mapped.visited = true;
-                        }
+                probe_size = 1;
+            }
+        }
 
-                        if constexpr (!is_right_semi_anti_join) {
+        if (current_offset < _batch_size) {
+            while (probe_index < probe_rows) {
+                if constexpr (ignore_null && need_null_map_for_probe) {
+                    if ((*null_map)[probe_index]) {
+                        if constexpr (probe_all) {
+                            _items_counts[probe_index++] = (uint32_t)1;
+                            // only full outer / left outer need insert the 
data of right table
                             if (LIKELY(current_offset < 
_build_block_rows.size())) {
-                                _build_block_offsets[current_offset] = 
mapped.block_offset;
-                                _build_block_rows[current_offset] = 
mapped.row_num;
+                                _build_block_offsets[current_offset] = -1;
+                                _build_block_rows[current_offset] = -1;
                             } else {
-                                
_build_block_offsets.emplace_back(mapped.block_offset);
-                                _build_block_rows.emplace_back(mapped.row_num);
+                                _build_block_offsets.emplace_back(-1);
+                                _build_block_rows.emplace_back(-1);
+                            }
+                            ++current_offset;
+                        } else {
+                            _items_counts[probe_index++] = (uint32_t)0;
+                        }
+                        all_match_one = false;
+                        if constexpr (probe_all) {
+                            if (current_offset >= _batch_size) {
+                                break;
                             }
+                        }
+                        continue;
+                    }
+                }
+                int last_offset = current_offset;
+                auto find_result = !need_null_map_for_probe
+                                           ? 
key_getter.find_key(hash_table_ctx.hash_table,
+                                                                 probe_index, 
*_arena)
+                                   : (*null_map)[probe_index]
+                                           ? 
decltype(key_getter.find_key(hash_table_ctx.hash_table,
+                                                                          
probe_index,
+                                                                          
*_arena)) {nullptr, false}
+                                           : 
key_getter.find_key(hash_table_ctx.hash_table,
+                                                                 probe_index, 
*_arena);
+                if (probe_index + PREFETCH_STEP < probe_rows) {
+                    key_getter.template 
prefetch<true>(hash_table_ctx.hash_table,
+                                                       probe_index + 
PREFETCH_STEP, *_arena);
+                }
+
+                auto current_probe_index = probe_index;
+                if constexpr (JoinOpType == TJoinOp::LEFT_ANTI_JOIN ||
+                              JoinOpType == 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
+                    if (is_mark_join) {
+                        ++current_offset;
+                        
assert_cast<doris::vectorized::ColumnVector<UInt8>&>(*mcol[mcol.size() - 1])
+                                .get_data()
+                                .template push_back(!find_result.is_found());
+                    } else {
+                        if (!find_result.is_found()) {
                             ++current_offset;
                         }
+                    }
+                    ++probe_index;
+                } else if constexpr (JoinOpType == TJoinOp::LEFT_SEMI_JOIN) {
+                    if (is_mark_join) {
+                        ++current_offset;
+                        
assert_cast<doris::vectorized::ColumnVector<UInt8>&>(*mcol[mcol.size() - 1])
+                                .get_data()
+                                .template push_back(find_result.is_found());
                     } else {
-                        for (auto it = mapped.begin(); it.ok(); ++it) {
+                        if (find_result.is_found()) {
+                            ++current_offset;
+                        }
+                    }
+                    ++probe_index;
+                } else {
+                    DCHECK(!is_mark_join);
+                    if (find_result.is_found()) {
+                        auto& mapped = find_result.get_mapped();
+                        // TODO: Iterators are currently considered to be a 
heavy operation and have a certain impact on performance.
+                        // We should rethink whether to use this iterator mode 
in the future. Now just opt the one row case
+                        if (mapped.get_row_count() == 1) {
+                            if constexpr (std::is_same_v<Mapped, 
RowRefListWithFlag>) {
+                                mapped.visited = true;
+                            }
+
                             if constexpr (!is_right_semi_anti_join) {
                                 if (LIKELY(current_offset < 
_build_block_rows.size())) {
-                                    _build_block_offsets[current_offset] = 
it->block_offset;
-                                    _build_block_rows[current_offset] = 
it->row_num;
+                                    _build_block_offsets[current_offset] = 
mapped.block_offset;
+                                    _build_block_rows[current_offset] = 
mapped.row_num;
                                 } else {
-                                    
_build_block_offsets.emplace_back(it->block_offset);
-                                    
_build_block_rows.emplace_back(it->row_num);
+                                    
_build_block_offsets.emplace_back(mapped.block_offset);
+                                    
_build_block_rows.emplace_back(mapped.row_num);
                                 }
                                 ++current_offset;
                             }
-                        }
-                        if constexpr (std::is_same_v<Mapped, 
RowRefListWithFlag>) {
-                            mapped.visited = true;
-                        }
-                    }
-                } else {
-                    if constexpr (probe_all) {
-                        // only full outer / left outer need insert the data 
of right table
-                        if (LIKELY(current_offset < _build_block_rows.size())) 
{
-                            _build_block_offsets[current_offset] = -1;
-                            _build_block_rows[current_offset] = -1;
+                            ++probe_index;
                         } else {
-                            _build_block_offsets.emplace_back(-1);
-                            _build_block_rows.emplace_back(-1);
+                            if constexpr (!is_right_semi_anti_join) {
+                                auto it = mapped.begin();
+                                for (; it.ok() && current_offset < 
_batch_size; ++it) {
+                                    if (LIKELY(current_offset < 
_build_block_rows.size())) {
+                                        _build_block_offsets[current_offset] = 
it->block_offset;
+                                        _build_block_rows[current_offset] = 
it->row_num;
+                                    } else {
+                                        
_build_block_offsets.emplace_back(it->block_offset);
+                                        
_build_block_rows.emplace_back(it->row_num);
+                                    }
+                                    ++current_offset;
+                                }
+                                probe_row_match_iter = it;
+                                if (!it.ok()) {
+                                    // If all matched rows for the current 
probe row are handled,
+                                    // advance to next probe row.
+                                    // If not(which means it excceed batch 
size), probe_index is not increased and
+                                    // remaining matched rows for the current 
probe row will be
+                                    // handled in the next call of this 
function
+                                    ++probe_index;
+                                }
+                            } else {
+                                ++probe_index;
+                            }
+                            if constexpr (std::is_same_v<Mapped, 
RowRefListWithFlag>) {
+                                mapped.visited = true;
+                            }
                         }
-                        ++current_offset;
+                    } else {
+                        if constexpr (probe_all) {
+                            // only full outer / left outer need insert the 
data of right table
+                            if (LIKELY(current_offset < 
_build_block_rows.size())) {
+                                _build_block_offsets[current_offset] = -1;
+                                _build_block_rows[current_offset] = -1;
+                            } else {
+                                _build_block_offsets.emplace_back(-1);
+                                _build_block_rows.emplace_back(-1);
+                            }
+                            ++current_offset;
+                        }
+                        ++probe_index;
                     }
                 }
-            }
 
-            uint32_t count = (uint32_t)(current_offset - last_offset);
-            _items_counts[probe_index++] = count;
-            all_match_one &= (count == 1);
-            if (current_offset >= _batch_size && !all_match_one) {
-                break;
+                uint32_t count = (uint32_t)(current_offset - last_offset);
+                _items_counts[current_probe_index] = count;
+                all_match_one &= (count == 1);
+                if (current_offset >= _batch_size) {
+                    break;
+                }
             }
+            probe_size = probe_index - last_probe_index + 
(probe_row_match_iter.ok() ? 1 : 0);
         }
     }
 
@@ -340,8 +392,7 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
                   JoinOpType != TJoinOp::RIGHT_ANTI_JOIN) {
         SCOPED_TIMER(_probe_side_output_timer);
         probe_side_output_column(mcol, _join_node->_left_output_slot_flags, 
current_offset,
-                                 last_probe_index, probe_index - 
last_probe_index, all_match_one,
-                                 false);
+                                 last_probe_index, probe_size, all_match_one, 
false);
     }
 
     output_block->swap(mutable_block.to_block());
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index 7cdd7248e2..658be0df1f 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -1027,6 +1027,8 @@ void HashJoinNode::_hash_table_init(RuntimeState* state) {
                                                    JoinOpType::value == 
TJoinOp::RIGHT_OUTER_JOIN ||
                                                    JoinOpType::value == 
TJoinOp::FULL_OUTER_JOIN,
                                            RowRefListWithFlag, RowRefList>>;
+                
_probe_row_match_iter.emplace<ForwardIterator<RowRefListType>>();
+
                 if (_build_expr_ctxs.size() == 1 && 
!_store_null_in_hash_table[0]) {
                     // Single column optimization
                     switch (_build_expr_ctxs[0]->root()->result_type()) {
diff --git a/be/src/vec/exec/join/vhash_join_node.h 
b/be/src/vec/exec/join/vhash_join_node.h
index ea72ac7883..9287cd6cfb 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -187,6 +187,10 @@ using HashTableCtxVariants =
                      ProcessHashTableProbe<TJoinOp::RIGHT_ANTI_JOIN>,
                      
ProcessHashTableProbe<TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN>>;
 
+using HashTableIteratorVariants =
+        std::variant<std::monostate, ForwardIterator<RowRefList>,
+                     ForwardIterator<RowRefListWithFlag>, 
ForwardIterator<RowRefListWithFlags>>;
+
 class HashJoinNode final : public VJoinNodeBase {
 public:
     // TODO: Best prefetch step is decided by machine. We should also provide a
@@ -278,6 +282,8 @@ private:
     // for full/right outer join
     ForwardIterator<RowRefListWithFlag> _outer_join_pull_visited_iter;
 
+    HashTableIteratorVariants _probe_row_match_iter;
+
     std::shared_ptr<std::vector<Block>> _build_blocks;
     Block _probe_block;
     ColumnRawPtrs _probe_columns;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to