This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new b5da3f74f5 [improvement](join) avoid unnecessary copying in _build_output_block (#21360) b5da3f74f5 is described below commit b5da3f74f507752fd6327e7e9af14f7e8052bfcb Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Tue Jul 4 12:13:49 2023 +0800 [improvement](join) avoid unnecessary copying in _build_output_block (#21360) If the source columns are mutually exclusive within a temporary block, there is no need to duplicate the data. --- be/src/vec/exec/join/vhash_join_node.cpp | 7 +++- be/src/vec/exec/join/vjoin_node_base.cpp | 44 ++++++++++++++++++------- be/src/vec/exec/join/vjoin_node_base.h | 2 +- be/src/vec/exec/join/vnested_loop_join_node.cpp | 6 +++- 4 files changed, 44 insertions(+), 15 deletions(-) diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 678d2f6483..074e210bf5 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -623,7 +623,12 @@ Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_ SCOPED_TIMER(_join_filter_timer); RETURN_IF_ERROR(VExprContext::filter_block(_conjuncts, &temp_block, temp_block.columns())); } - RETURN_IF_ERROR(_build_output_block(&temp_block, output_block)); + + // Here make _join_block release the columns' ptr + _join_block.set_columns(_join_block.clone_empty_columns()); + mutable_join_block.clear(); + + RETURN_IF_ERROR(_build_output_block(&temp_block, output_block, false)); _reset_tuple_is_null_column(); reached_limit(output_block, eos); return Status::OK(); diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp b/be/src/vec/exec/join/vjoin_node_base.cpp index 57870a0ac8..a4e1493d58 100644 --- a/be/src/vec/exec/join/vjoin_node_base.cpp +++ b/be/src/vec/exec/join/vjoin_node_base.cpp @@ -33,6 +33,7 @@ #include "util/telemetry/telemetry.h" #include "util/threadpool.h" #include "vec/columns/column.h" +#include "vec/columns/column_array.h" #include "vec/columns/column_nullable.h" #include "vec/columns/column_vector.h" #include "vec/columns/columns_number.h" @@ -149,7 +150,8 @@ void VJoinNodeBase::_construct_mutable_join_block() { } } -Status VJoinNodeBase::_build_output_block(Block* origin_block, Block* output_block) { +Status VJoinNodeBase::_build_output_block(Block* origin_block, Block* output_block, + bool keep_origin) { SCOPED_TIMER(_build_output_block_timer); auto is_mem_reuse = output_block->mem_reuse(); MutableBlock mutable_block = @@ -160,13 +162,21 @@ Status VJoinNodeBase::_build_output_block(Block* origin_block, Block* output_blo // TODO: After FE plan support same nullable of output expr and origin block and mutable column // we should replace `insert_column_datas` by `insert_range_from` - auto insert_column_datas = [](auto& to, const auto& from, size_t rows) { - if (to->is_nullable() && !from.is_nullable()) { - auto& null_column = reinterpret_cast<ColumnNullable&>(*to); - null_column.get_nested_column().insert_range_from(from, 0, rows); - null_column.get_null_map_column().get_data().resize_fill(rows, 0); + auto insert_column_datas = [keep_origin](auto& to, ColumnPtr& from, size_t rows) { + if (to->is_nullable() && !from->is_nullable()) { + if (keep_origin || !from->is_exclusive()) { + auto& null_column = reinterpret_cast<ColumnNullable&>(*to); + null_column.get_nested_column().insert_range_from(*from, 0, rows); + null_column.get_null_map_column().get_data().resize_fill(rows, 0); + } else { + to = make_nullable(from, false)->assume_mutable(); + } } else { - to->insert_range_from(from, 0, rows); + if (keep_origin || !from->is_exclusive()) { + to->insert_range_from(*from, 0, rows); + } else { + to = from->assume_mutable(); + } } }; if (rows != 0) { @@ -174,7 +184,7 @@ Status VJoinNodeBase::_build_output_block(Block* origin_block, Block* output_blo if (_output_expr_ctxs.empty()) { DCHECK(mutable_columns.size() == row_desc().num_materialized_slots()); for (int i = 0; i < mutable_columns.size(); ++i) { - insert_column_datas(mutable_columns[i], *origin_block->get_by_position(i).column, + insert_column_datas(mutable_columns[i], origin_block->get_by_position(i).column, rows); } } else { @@ -183,13 +193,23 @@ Status VJoinNodeBase::_build_output_block(Block* origin_block, Block* output_blo for (int i = 0; i < mutable_columns.size(); ++i) { auto result_column_id = -1; RETURN_IF_ERROR(_output_expr_ctxs[i]->execute(origin_block, &result_column_id)); - auto column_ptr = origin_block->get_by_position(result_column_id) - .column->convert_to_full_column_if_const(); - insert_column_datas(mutable_columns[i], *column_ptr, rows); + auto& origin_column = origin_block->get_by_position(result_column_id).column; + + /// `convert_to_full_column_if_const` will create a pointer to the origin column if + /// the origin column is not ColumnConst/ColumnArray, this make the column be not + /// exclusive. + /// TODO: maybe need a method to check if a column need to be converted to full + /// column. + if (is_column_const(*origin_column) || check_column<ColumnArray>(origin_column)) { + auto column_ptr = origin_column->convert_to_full_column_if_const(); + insert_column_datas(mutable_columns[i], column_ptr, rows); + } else { + insert_column_datas(mutable_columns[i], origin_column, rows); + } } } - if (!is_mem_reuse) { + if (!is_mem_reuse || !keep_origin) { output_block->swap(mutable_block.to_block()); } DCHECK(output_block->rows() == rows); diff --git a/be/src/vec/exec/join/vjoin_node_base.h b/be/src/vec/exec/join/vjoin_node_base.h index f29897719d..120e77785e 100644 --- a/be/src/vec/exec/join/vjoin_node_base.h +++ b/be/src/vec/exec/join/vjoin_node_base.h @@ -80,7 +80,7 @@ protected: // Convert the intermediate blocks to the final result. For example, if the block from probe // side is non-nullable and the join op is righter outer join, we need to convert the non-nullable // columns from probe side to a nullable column. - Status _build_output_block(Block* origin_block, Block* output_block); + Status _build_output_block(Block* origin_block, Block* output_block, bool keep_origin = true); // Open probe side asynchronously. void _probe_side_open_thread(RuntimeState* state, std::promise<Status>* status); diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp index 0b50860874..a3b6c30c9f 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.cpp +++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp @@ -672,13 +672,17 @@ Status VNestedLoopJoinNode::pull(RuntimeState* state, vectorized::Block* block, { Block tmp_block = _join_block; + + // Here make _join_block release the columns' ptr + _join_block.set_columns(_join_block.clone_empty_columns()); + _add_tuple_is_null_column(&tmp_block); { SCOPED_TIMER(_join_filter_timer); RETURN_IF_ERROR( VExprContext::filter_block(_conjuncts, &tmp_block, tmp_block.columns())); } - RETURN_IF_ERROR(_build_output_block(&tmp_block, block)); + RETURN_IF_ERROR(_build_output_block(&tmp_block, block, false)); _reset_tuple_is_null_column(); } _join_block.clear_column_data(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org