This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new d00b7ad04b [Opt](performance) opt the outer join for nested loop join (#20524) d00b7ad04b is described below commit d00b7ad04bce8575beb973947da0f52716943632 Author: HappenLee <happen...@hotmail.com> AuthorDate: Wed Jun 7 17:31:36 2023 +0800 [Opt](performance) opt the outer join for nested loop join (#20524) --- be/src/vec/exec/join/vnested_loop_join_node.cpp | 124 +++++++++++++++--------- be/src/vec/exec/join/vnested_loop_join_node.h | 82 +++++++++------- 2 files changed, 122 insertions(+), 84 deletions(-) diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp index b8160b2ade..17cd2ce22a 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.cpp +++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp @@ -97,7 +97,6 @@ private: VNestedLoopJoinNode::VNestedLoopJoinNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : VJoinNodeBase(pool, tnode, descs), - _cur_probe_row_visited_flags(false), _matched_rows_done(false), _left_block_pos(0), _left_side_eos(false), @@ -232,6 +231,9 @@ Status VNestedLoopJoinNode::sink(doris::RuntimeState* state, vectorized::Block* Status VNestedLoopJoinNode::push(doris::RuntimeState* state, vectorized::Block* block, bool eos) { COUNTER_UPDATE(_probe_rows_counter, block->rows()); + _cur_probe_row_visited_flags.resize(block->rows()); + std::fill(_cur_probe_row_visited_flags.begin(), _cur_probe_row_visited_flags.end(), 0); + _left_block_pos = 0; _need_more_input_data = false; _left_side_eos = eos; @@ -287,22 +289,24 @@ void VNestedLoopJoinNode::_append_left_data_with_null(MutableBlock& mutable_bloc DCHECK(_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN); assert_cast<ColumnNullable*>(dst_columns[i].get()) ->get_nested_column_ptr() - ->insert_many_from(*src_column.column, _left_block_pos, 1); + ->insert_range_from(*src_column.column, _left_block_start_pos, + _left_side_process_count); assert_cast<ColumnNullable*>(dst_columns[i].get()) ->get_null_map_column() .get_data() .resize_fill(origin_sz + 1, 0); } else { - dst_columns[i]->insert_many_from(*src_column.column, _left_block_pos, 1); + dst_columns[i]->insert_range_from(*src_column.column, _left_block_start_pos, + _left_side_process_count); } } for (size_t i = 0; i < _num_build_side_columns; ++i) { - dst_columns[_num_probe_side_columns + i]->insert_default(); + dst_columns[_num_probe_side_columns + i]->insert_many_defaults(_left_side_process_count); } IColumn::Filter& mark_data = assert_cast<doris::vectorized::ColumnVector<UInt8>&>( *dst_columns[dst_columns.size() - 1]) .get_data(); - mark_data.resize_fill(mark_data.size() + 1, 0); + mark_data.resize_fill(mark_data.size() + _left_side_process_count, 0); } void VNestedLoopJoinNode::_process_left_child_block(MutableBlock& mutable_block, @@ -457,54 +461,58 @@ void VNestedLoopJoinNode::_finalize_current_phase(MutableBlock& mutable_block, s } _output_null_idx_build_side = i; } else { - if constexpr (IsSemi) { - if (!_cur_probe_row_visited_flags && !_is_mark_join) { - return; + if (!_is_mark_join) { + auto new_size = column_size; + DCHECK_LE(_left_block_start_pos + _left_side_process_count, _left_block.rows()); + for (int j = _left_block_start_pos; + j < _left_block_start_pos + _left_side_process_count; ++j) { + if (_cur_probe_row_visited_flags[j] == IsSemi) { + new_size++; + for (size_t i = 0; i < _num_probe_side_columns; ++i) { + const ColumnWithTypeAndName src_column = _left_block.get_by_position(i); + if (!src_column.column->is_nullable() && dst_columns[i]->is_nullable()) { + DCHECK(_join_op == TJoinOp::FULL_OUTER_JOIN); + assert_cast<ColumnNullable*>(dst_columns[i].get()) + ->get_nested_column_ptr() + ->insert_many_from(*src_column.column, j, 1); + assert_cast<ColumnNullable*>(dst_columns[i].get()) + ->get_null_map_column() + .get_data() + .resize_fill(new_size, 0); + } else { + dst_columns[i]->insert_many_from(*src_column.column, j, 1); + } + } + } } - } else { - if (_cur_probe_row_visited_flags && !_is_mark_join) { - return; + if (new_size > column_size) { + for (size_t i = 0; i < _num_build_side_columns; ++i) { + dst_columns[_num_probe_side_columns + i]->insert_many_defaults(new_size - + column_size); + } + _resize_fill_tuple_is_null_column(new_size, 0, 1); } - } - - auto new_size = column_size + 1; - if (_is_mark_join) { + } else { IColumn::Filter& mark_data = assert_cast<doris::vectorized::ColumnVector<UInt8>&>( *dst_columns[dst_columns.size() - 1]) .get_data(); - mark_data.resize_fill(mark_data.size() + 1, - (IsSemi && !_cur_probe_row_visited_flags) || - (!IsSemi && _cur_probe_row_visited_flags) - ? 0 - : 1); - } - - DCHECK_LT(_left_block_pos, _left_block.rows()); - for (size_t i = 0; i < _num_probe_side_columns; ++i) { - const ColumnWithTypeAndName src_column = _left_block.get_by_position(i); - if (!src_column.column->is_nullable() && dst_columns[i]->is_nullable()) { - DCHECK(_join_op == TJoinOp::FULL_OUTER_JOIN); - assert_cast<ColumnNullable*>(dst_columns[i].get()) - ->get_nested_column_ptr() - ->insert_many_from(*src_column.column, _left_block_pos, 1); - assert_cast<ColumnNullable*>(dst_columns[i].get()) - ->get_null_map_column() - .get_data() - .resize_fill(new_size, 0); - } else { - dst_columns[i]->insert_many_from(*src_column.column, _left_block_pos, 1); + mark_data.reserve(mark_data.size() + _left_side_process_count); + DCHECK_LT(_left_block_pos, _left_block.rows()); + for (int j = _left_block_start_pos; + j < _left_block_start_pos + _left_side_process_count; ++j) { + mark_data.emplace_back(IsSemi != _cur_probe_row_visited_flags[j]); + for (size_t i = 0; i < _num_probe_side_columns; ++i) { + const ColumnWithTypeAndName src_column = _left_block.get_by_position(i); + DCHECK(_join_op != TJoinOp::FULL_OUTER_JOIN); + dst_columns[i]->insert_from(*src_column.column, j); + } } } - for (size_t i = 0; i < _num_build_side_columns; ++i) { - dst_columns[_num_probe_side_columns + i]->insert_default(); - } - _resize_fill_tuple_is_null_column(new_size, 0, 1); } } void VNestedLoopJoinNode::_reset_with_next_probe_row() { // TODO: need a vector of left block to register the _probe_row_visited_flags - _cur_probe_row_visited_flags = false; _current_build_pos = 0; _left_block_pos++; } @@ -526,8 +534,8 @@ void VNestedLoopJoinNode::_do_filtering_and_update_visited_flags_impl( ->get_data(); auto* __restrict build_side_flag_data = build_side_flag.data(); auto cur_sz = build_side_flag.size(); - const size_t offset = _offset_stack.top(); - _offset_stack.pop(); + const size_t offset = _build_offset_stack.top(); + _build_offset_stack.pop(); for (size_t j = 0; j < cur_sz; j++) { build_side_flag_data[j] |= filter[offset + j]; } @@ -535,7 +543,20 @@ void VNestedLoopJoinNode::_do_filtering_and_update_visited_flags_impl( } } if constexpr (SetProbeSideFlag) { - _cur_probe_row_visited_flags |= simd::contain_byte<uint8>(filter.data(), filter.size(), 1); + int end = filter.size(); + for (int i = _left_block_pos == _left_block.rows() ? _left_block_pos - 1 : _left_block_pos; + i >= _left_block_start_pos; i--) { + int offset = 0; + if (!_probe_offset_stack.empty()) { + offset = _probe_offset_stack.top(); + _probe_offset_stack.pop(); + } + if (!_cur_probe_row_visited_flags[i]) { + _cur_probe_row_visited_flags[i] = + simd::contain_byte<uint8>(filter.data() + offset, end - offset, 1) ? 1 : 0; + } + end = offset; + } } if (materialize) { Block::filter_block_internal(block, filter, column_to_keep); @@ -554,7 +575,7 @@ Status VNestedLoopJoinNode::_do_filtering_and_update_visited_flags(Block* block, // 3. Use bool column to do filtering. size_t build_block_idx = _current_build_pos == 0 ? _build_blocks.size() - 1 : _current_build_pos - 1; - size_t processed_blocks_num = _offset_stack.size(); + size_t processed_blocks_num = _build_offset_stack.size(); if (LIKELY(!_join_conjuncts.empty() && block->rows() > 0)) { IColumn::Filter filter(block->rows(), 1); bool can_filter_all = false; @@ -563,6 +584,11 @@ Status VNestedLoopJoinNode::_do_filtering_and_update_visited_flags(Block* block, if (can_filter_all) { CLEAR_BLOCK + std::stack<uint16_t> empty1; + _probe_offset_stack.swap(empty1); + + std::stack<uint16_t> empty2; + _build_offset_stack.swap(empty2); } else { _do_filtering_and_update_visited_flags_impl<decltype(filter), SetBuildSideFlag, SetProbeSideFlag>( @@ -577,14 +603,16 @@ Status VNestedLoopJoinNode::_do_filtering_and_update_visited_flags(Block* block, ->get_data(); auto* __restrict build_side_flag_data = build_side_flag.data(); auto cur_sz = build_side_flag.size(); - _offset_stack.pop(); + _build_offset_stack.pop(); memset(reinterpret_cast<void*>(build_side_flag_data), 1, cur_sz); build_block_idx = build_block_idx == 0 ? _build_blocks.size() - 1 : build_block_idx - 1; } } if constexpr (SetProbeSideFlag) { - _cur_probe_row_visited_flags = true; + std::stack<uint16_t> empty; + _probe_offset_stack.swap(empty); + std::fill(_cur_probe_row_visited_flags.begin(), _cur_probe_row_visited_flags.end(), 1); } if (!materialize) { CLEAR_BLOCK @@ -676,7 +704,7 @@ Status VNestedLoopJoinNode::pull(RuntimeState* state, vectorized::Block* block, } bool VNestedLoopJoinNode::need_more_input_data() const { - return _need_more_input_data and !_left_side_eos; + return _need_more_input_data and !_left_side_eos and _join_block.rows() == 0; } void VNestedLoopJoinNode::release_resource(doris::RuntimeState* state) { diff --git a/be/src/vec/exec/join/vnested_loop_join_node.h b/be/src/vec/exec/join/vnested_loop_join_node.h index 4bd66798d9..03676629bc 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.h +++ b/be/src/vec/exec/join/vnested_loop_join_node.h @@ -94,31 +94,46 @@ private: constexpr bool ignore_null = JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN || JoinOpType::value == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN; + _left_block_start_pos = _left_block_pos; + _left_side_process_count = 0; + DCHECK(!_need_more_input_data || !_matched_rows_done); MutableBlock mutable_join_block(&_join_block); + if (!_matched_rows_done && !_need_more_input_data) { + // We should try to join rows if there still are some rows from probe side. + while (_join_block.rows() < state->batch_size()) { + while (_current_build_pos == _build_blocks.size() || + _left_block_pos == _left_block.rows()) { + // if left block is empty(), do not need disprocess the left block rows + if (_left_block.rows() > _left_block_pos) { + _left_side_process_count++; + } - while (_join_block.rows() < state->batch_size() && !_matched_rows_done) { - // If this left block is exhausted or empty, we need to pull data from left child. - if (_left_block_pos == _left_block.rows()) { - if (_left_side_eos) { - _matched_rows_done = true; - } else { - _left_block_pos = 0; - _need_more_input_data = true; - return Status::OK(); + _reset_with_next_probe_row(); + if (_left_block_pos < _left_block.rows()) { + if constexpr (set_probe_side_flag) { + _probe_offset_stack.push(mutable_join_block.rows()); + } + } else { + if (_left_side_eos) { + _matched_rows_done = true; + } else { + _need_more_input_data = true; + } + break; + } } - } - // We should try to join rows if there still are some rows from probe side. - if (!_matched_rows_done && _current_build_pos < _build_blocks.size()) { - do { - const auto& now_process_build_block = _build_blocks[_current_build_pos++]; - if constexpr (set_build_side_flag) { - _offset_stack.push(mutable_join_block.rows()); - } - _process_left_child_block(mutable_join_block, now_process_build_block); - } while (_join_block.rows() < state->batch_size() && - _current_build_pos < _build_blocks.size()); + // Do not have left row need to be disposed + if (_matched_rows_done || _need_more_input_data) { + break; + } + + const auto& now_process_build_block = _build_blocks[_current_build_pos++]; + if constexpr (set_build_side_flag) { + _build_offset_stack.push(mutable_join_block.rows()); + } + _process_left_child_block(mutable_join_block, now_process_build_block); } if constexpr (set_probe_side_flag) { @@ -133,28 +148,20 @@ private: } mutable_join_block = MutableBlock(&_join_block); // If this join operation is left outer join or full outer join, when - // `_current_build_pos == _build_blocks.size()`, means all rows from build - // side have been joined with the current probe row, we should output current + // `_left_side_process_count`, means all rows from build + // side have been joined with _left_side_process_count, we should output current // probe row with null from build side. - if (_current_build_pos == _build_blocks.size()) { - if (!_matched_rows_done) { - _finalize_current_phase<false, - JoinOpType::value == TJoinOp::LEFT_SEMI_JOIN>( - mutable_join_block, state->batch_size()); - _reset_with_next_probe_row(); - } - break; + if (_left_side_process_count) { + _finalize_current_phase<false, JoinOpType::value == TJoinOp::LEFT_SEMI_JOIN>( + mutable_join_block, state->batch_size()); } } - if (!_matched_rows_done && _current_build_pos == _build_blocks.size()) { + if (_left_side_process_count) { if (_is_mark_join && _build_blocks.empty()) { DCHECK_EQ(JoinOpType::value, TJoinOp::CROSS_JOIN); _append_left_data_with_null(mutable_join_block); - _reset_with_next_probe_row(); - break; } - _reset_with_next_probe_row(); } } @@ -221,7 +228,7 @@ private: // Visited flags for each row in build side. MutableColumns _build_side_visited_flags; // Visited flags for current row in probe side. - bool _cur_probe_row_visited_flags; + std::vector<int8_t> _cur_probe_row_visited_flags; size_t _current_build_pos = 0; size_t _num_probe_side_columns = 0; @@ -238,8 +245,10 @@ private: // is responsible for. Block _left_block; + int _left_block_start_pos = 0; int _left_block_pos; // current scan pos in _left_block bool _left_side_eos; // if true, left child has no more rows to process + int _left_side_process_count = 0; bool _old_version_flag; @@ -249,7 +258,8 @@ private: VExprContextSPtrs _filter_src_expr_ctxs; bool _is_output_left_side_only = false; bool _need_more_input_data = true; - std::stack<uint16_t> _offset_stack; + std::stack<uint16_t> _build_offset_stack; + std::stack<uint16_t> _probe_offset_stack; VExprContextSPtrs _join_conjuncts; friend struct RuntimeFilterBuild; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org