This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new bf16228851 [fix](hashjoin) join produce blocks with rows larger than batch size (#16166) bf16228851 is described below commit bf16228851c569259f2a8d10cd91aa4018199848 Author: TengJianPing <18241664+jackte...@users.noreply.github.com> AuthorDate: Wed Feb 1 16:02:31 2023 +0800 [fix](hashjoin) join produce blocks with rows larger than batch size (#16166) * [fix](hashjoin) join produce blocks with rows larger than batch size * fix --- .../vec/exec/join/process_hash_table_probe_impl.h | 241 +++++++++++++-------- be/src/vec/exec/join/vhash_join_node.cpp | 2 + be/src/vec/exec/join/vhash_join_node.h | 6 + 3 files changed, 154 insertions(+), 95 deletions(-) diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h b/be/src/vec/exec/join/process_hash_table_probe_impl.h index 096a54ed43..238a3de0b4 100644 --- a/be/src/vec/exec/join/process_hash_table_probe_impl.h +++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h @@ -130,7 +130,6 @@ void ProcessHashTableProbe<JoinOpType>::probe_side_output_column( if (output_slot_flags[i]) { auto& column = probe_block.get_by_position(i).column; if (all_match_one) { - DCHECK_EQ(probe_size, column->size() - last_probe_index); mcol[i]->insert_range_from(*column, last_probe_index, probe_size); } else { DCHECK_GE(_items_counts.size(), last_probe_index + probe_size); @@ -209,124 +208,177 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c bool all_match_one = true; int last_probe_index = probe_index; + size_t probe_size = 0; + auto& probe_row_match_iter = + std::get<ForwardIterator<Mapped>>(_join_node->_probe_row_match_iter); { SCOPED_TIMER(_search_hashtable_timer); - while (probe_index < probe_rows) { - if constexpr (ignore_null && need_null_map_for_probe) { - if ((*null_map)[probe_index]) { - if constexpr (probe_all) { - _items_counts[probe_index++] = (uint32_t)1; - // only full outer / left outer need insert the data of right table - if (LIKELY(current_offset < _build_block_rows.size())) { - _build_block_offsets[current_offset] = -1; - _build_block_rows[current_offset] = -1; - } else { - _build_block_offsets.emplace_back(-1); - _build_block_rows.emplace_back(-1); - } - ++current_offset; + if constexpr (!is_right_semi_anti_join) { + // handle ramaining matched rows from last probe row + if (probe_row_match_iter.ok()) { + for (; probe_row_match_iter.ok() && current_offset < _batch_size; + ++probe_row_match_iter) { + if (LIKELY(current_offset < _build_block_rows.size())) { + _build_block_offsets[current_offset] = probe_row_match_iter->block_offset; + _build_block_rows[current_offset] = probe_row_match_iter->row_num; } else { - _items_counts[probe_index++] = (uint32_t)0; + _build_block_offsets.emplace_back(probe_row_match_iter->block_offset); + _build_block_rows.emplace_back(probe_row_match_iter->row_num); } - all_match_one = false; - continue; - } - } - int last_offset = current_offset; - auto find_result = - !need_null_map_for_probe - ? key_getter.find_key(hash_table_ctx.hash_table, probe_index, *_arena) - : (*null_map)[probe_index] - ? decltype(key_getter.find_key(hash_table_ctx.hash_table, probe_index, - *_arena)) {nullptr, false} - : key_getter.find_key(hash_table_ctx.hash_table, probe_index, *_arena); - if (probe_index + PREFETCH_STEP < probe_rows) { - key_getter.template prefetch<true>(hash_table_ctx.hash_table, - probe_index + PREFETCH_STEP, *_arena); - } - - if constexpr (JoinOpType == TJoinOp::LEFT_ANTI_JOIN || - JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { - if (is_mark_join) { ++current_offset; - assert_cast<doris::vectorized::ColumnVector<UInt8>&>(*mcol[mcol.size() - 1]) - .get_data() - .template push_back(!find_result.is_found()); - } else { - if (!find_result.is_found()) { - ++current_offset; - } } - } else if constexpr (JoinOpType == TJoinOp::LEFT_SEMI_JOIN) { - if (is_mark_join) { - ++current_offset; - assert_cast<doris::vectorized::ColumnVector<UInt8>&>(*mcol[mcol.size() - 1]) - .get_data() - .template push_back(find_result.is_found()); - } else { - if (find_result.is_found()) { - ++current_offset; - } + _items_counts[probe_index] = current_offset; + all_match_one &= (current_offset == 1); + if (!probe_row_match_iter.ok()) { + ++probe_index; } - } else { - DCHECK(!is_mark_join); - if (find_result.is_found()) { - auto& mapped = find_result.get_mapped(); - // TODO: Iterators are currently considered to be a heavy operation and have a certain impact on performance. - // We should rethink whether to use this iterator mode in the future. Now just opt the one row case - if (mapped.get_row_count() == 1) { - if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) { - mapped.visited = true; - } + probe_size = 1; + } + } - if constexpr (!is_right_semi_anti_join) { + if (current_offset < _batch_size) { + while (probe_index < probe_rows) { + if constexpr (ignore_null && need_null_map_for_probe) { + if ((*null_map)[probe_index]) { + if constexpr (probe_all) { + _items_counts[probe_index++] = (uint32_t)1; + // only full outer / left outer need insert the data of right table if (LIKELY(current_offset < _build_block_rows.size())) { - _build_block_offsets[current_offset] = mapped.block_offset; - _build_block_rows[current_offset] = mapped.row_num; + _build_block_offsets[current_offset] = -1; + _build_block_rows[current_offset] = -1; } else { - _build_block_offsets.emplace_back(mapped.block_offset); - _build_block_rows.emplace_back(mapped.row_num); + _build_block_offsets.emplace_back(-1); + _build_block_rows.emplace_back(-1); + } + ++current_offset; + } else { + _items_counts[probe_index++] = (uint32_t)0; + } + all_match_one = false; + if constexpr (probe_all) { + if (current_offset >= _batch_size) { + break; } + } + continue; + } + } + int last_offset = current_offset; + auto find_result = !need_null_map_for_probe + ? key_getter.find_key(hash_table_ctx.hash_table, + probe_index, *_arena) + : (*null_map)[probe_index] + ? decltype(key_getter.find_key(hash_table_ctx.hash_table, + probe_index, + *_arena)) {nullptr, false} + : key_getter.find_key(hash_table_ctx.hash_table, + probe_index, *_arena); + if (probe_index + PREFETCH_STEP < probe_rows) { + key_getter.template prefetch<true>(hash_table_ctx.hash_table, + probe_index + PREFETCH_STEP, *_arena); + } + + auto current_probe_index = probe_index; + if constexpr (JoinOpType == TJoinOp::LEFT_ANTI_JOIN || + JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { + if (is_mark_join) { + ++current_offset; + assert_cast<doris::vectorized::ColumnVector<UInt8>&>(*mcol[mcol.size() - 1]) + .get_data() + .template push_back(!find_result.is_found()); + } else { + if (!find_result.is_found()) { ++current_offset; } + } + ++probe_index; + } else if constexpr (JoinOpType == TJoinOp::LEFT_SEMI_JOIN) { + if (is_mark_join) { + ++current_offset; + assert_cast<doris::vectorized::ColumnVector<UInt8>&>(*mcol[mcol.size() - 1]) + .get_data() + .template push_back(find_result.is_found()); } else { - for (auto it = mapped.begin(); it.ok(); ++it) { + if (find_result.is_found()) { + ++current_offset; + } + } + ++probe_index; + } else { + DCHECK(!is_mark_join); + if (find_result.is_found()) { + auto& mapped = find_result.get_mapped(); + // TODO: Iterators are currently considered to be a heavy operation and have a certain impact on performance. + // We should rethink whether to use this iterator mode in the future. Now just opt the one row case + if (mapped.get_row_count() == 1) { + if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) { + mapped.visited = true; + } + if constexpr (!is_right_semi_anti_join) { if (LIKELY(current_offset < _build_block_rows.size())) { - _build_block_offsets[current_offset] = it->block_offset; - _build_block_rows[current_offset] = it->row_num; + _build_block_offsets[current_offset] = mapped.block_offset; + _build_block_rows[current_offset] = mapped.row_num; } else { - _build_block_offsets.emplace_back(it->block_offset); - _build_block_rows.emplace_back(it->row_num); + _build_block_offsets.emplace_back(mapped.block_offset); + _build_block_rows.emplace_back(mapped.row_num); } ++current_offset; } - } - if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) { - mapped.visited = true; - } - } - } else { - if constexpr (probe_all) { - // only full outer / left outer need insert the data of right table - if (LIKELY(current_offset < _build_block_rows.size())) { - _build_block_offsets[current_offset] = -1; - _build_block_rows[current_offset] = -1; + ++probe_index; } else { - _build_block_offsets.emplace_back(-1); - _build_block_rows.emplace_back(-1); + if constexpr (!is_right_semi_anti_join) { + auto it = mapped.begin(); + for (; it.ok() && current_offset < _batch_size; ++it) { + if (LIKELY(current_offset < _build_block_rows.size())) { + _build_block_offsets[current_offset] = it->block_offset; + _build_block_rows[current_offset] = it->row_num; + } else { + _build_block_offsets.emplace_back(it->block_offset); + _build_block_rows.emplace_back(it->row_num); + } + ++current_offset; + } + probe_row_match_iter = it; + if (!it.ok()) { + // If all matched rows for the current probe row are handled, + // advance to next probe row. + // If not(which means it excceed batch size), probe_index is not increased and + // remaining matched rows for the current probe row will be + // handled in the next call of this function + ++probe_index; + } + } else { + ++probe_index; + } + if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) { + mapped.visited = true; + } } - ++current_offset; + } else { + if constexpr (probe_all) { + // only full outer / left outer need insert the data of right table + if (LIKELY(current_offset < _build_block_rows.size())) { + _build_block_offsets[current_offset] = -1; + _build_block_rows[current_offset] = -1; + } else { + _build_block_offsets.emplace_back(-1); + _build_block_rows.emplace_back(-1); + } + ++current_offset; + } + ++probe_index; } } - } - uint32_t count = (uint32_t)(current_offset - last_offset); - _items_counts[probe_index++] = count; - all_match_one &= (count == 1); - if (current_offset >= _batch_size && !all_match_one) { - break; + uint32_t count = (uint32_t)(current_offset - last_offset); + _items_counts[current_probe_index] = count; + all_match_one &= (count == 1); + if (current_offset >= _batch_size) { + break; + } } + probe_size = probe_index - last_probe_index + (probe_row_match_iter.ok() ? 1 : 0); } } @@ -340,8 +392,7 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c JoinOpType != TJoinOp::RIGHT_ANTI_JOIN) { SCOPED_TIMER(_probe_side_output_timer); probe_side_output_column(mcol, _join_node->_left_output_slot_flags, current_offset, - last_probe_index, probe_index - last_probe_index, all_match_one, - false); + last_probe_index, probe_size, all_match_one, false); } output_block->swap(mutable_block.to_block()); diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 7cdd7248e2..658be0df1f 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -1027,6 +1027,8 @@ void HashJoinNode::_hash_table_init(RuntimeState* state) { JoinOpType::value == TJoinOp::RIGHT_OUTER_JOIN || JoinOpType::value == TJoinOp::FULL_OUTER_JOIN, RowRefListWithFlag, RowRefList>>; + _probe_row_match_iter.emplace<ForwardIterator<RowRefListType>>(); + if (_build_expr_ctxs.size() == 1 && !_store_null_in_hash_table[0]) { // Single column optimization switch (_build_expr_ctxs[0]->root()->result_type()) { diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index ea72ac7883..9287cd6cfb 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -187,6 +187,10 @@ using HashTableCtxVariants = ProcessHashTableProbe<TJoinOp::RIGHT_ANTI_JOIN>, ProcessHashTableProbe<TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN>>; +using HashTableIteratorVariants = + std::variant<std::monostate, ForwardIterator<RowRefList>, + ForwardIterator<RowRefListWithFlag>, ForwardIterator<RowRefListWithFlags>>; + class HashJoinNode final : public VJoinNodeBase { public: // TODO: Best prefetch step is decided by machine. We should also provide a @@ -278,6 +282,8 @@ private: // for full/right outer join ForwardIterator<RowRefListWithFlag> _outer_join_pull_visited_iter; + HashTableIteratorVariants _probe_row_match_iter; + std::shared_ptr<std::vector<Block>> _build_blocks; Block _probe_block; ColumnRawPtrs _probe_columns; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org