This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 24cce35bd59607e248457214d347c28a1a755b11 Author: Mryange <59914473+mrya...@users.noreply.github.com> AuthorDate: Thu Jul 20 22:53:19 2023 +0800 [refactor](mem_reuse) refactor mem_reuse in MutableBlock (#21564) --- be/src/exec/exec_node.cpp | 9 ++------ be/src/vec/common/sort/partition_sorter.cpp | 11 +++------- be/src/vec/common/sort/sorter.cpp | 12 ++++------- be/src/vec/exec/vrepeat_node.cpp | 27 +++++------------------ be/src/vec/exec/vtable_function_node.cpp | 31 ++++++--------------------- be/src/vec/exec/vunion_node.cpp | 31 ++++----------------------- be/src/vec/runtime/vsorted_run_merger.cpp | 12 ++++------- be/src/vec/utils/util.hpp | 33 ++++++++++++++++++++++++++++- 8 files changed, 60 insertions(+), 106 deletions(-) diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 938cbc2608..c6b7826deb 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -528,11 +528,8 @@ std::string ExecNode::get_name() { Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Block* output_block) { SCOPED_TIMER(_projection_timer); using namespace vectorized; - auto is_mem_reuse = output_block->mem_reuse(); MutableBlock mutable_block = - is_mem_reuse ? MutableBlock(output_block) - : MutableBlock(VectorizedUtils::create_empty_columnswithtypename( - *_output_row_descriptor)); + VectorizedUtils::build_mutable_mem_reuse_block(output_block, *_output_row_descriptor); auto rows = origin_block->rows(); if (rows != 0) { @@ -552,9 +549,7 @@ Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Blo mutable_columns[i]->insert_range_from(*column_ptr, 0, rows); } } - - if (!is_mem_reuse) output_block->swap(mutable_block.to_block()); - DCHECK(output_block->rows() == rows); + DCHECK(mutable_block.rows() == rows); } return Status::OK(); diff --git a/be/src/vec/common/sort/partition_sorter.cpp b/be/src/vec/common/sort/partition_sorter.cpp index ca29d62eb4..1bffb5ed76 100644 --- a/be/src/vec/common/sort/partition_sorter.cpp +++ b/be/src/vec/common/sort/partition_sorter.cpp @@ -94,10 +94,9 @@ Status PartitionSorter::get_next(RuntimeState* state, Block* block, bool* eos) { Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int batch_size) { const auto& sorted_block = _state->get_sorted_block()[0]; size_t num_columns = sorted_block.columns(); - bool mem_reuse = output_block->mem_reuse(); - MutableColumns merged_columns = - mem_reuse ? output_block->mutate_columns() : sorted_block.clone_empty_columns(); - + MutableBlock m_block = + VectorizedUtils::build_mutable_mem_reuse_block(output_block, sorted_block); + MutableColumns& merged_columns = m_block.mutable_columns(); size_t current_output_rows = 0; auto& priority_queue = _state->get_priority_queue(); @@ -189,10 +188,6 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int } } - if (!mem_reuse) { - Block merge_block = sorted_block.clone_with_columns(std::move(merged_columns)); - merge_block.swap(*output_block); - } _output_total_rows += output_block->rows(); if (current_output_rows == 0 || get_enough_data == true) { *eos = true; diff --git a/be/src/vec/common/sort/sorter.cpp b/be/src/vec/common/sort/sorter.cpp index 1254b311a0..ae8868ff2a 100644 --- a/be/src/vec/common/sort/sorter.cpp +++ b/be/src/vec/common/sort/sorter.cpp @@ -32,6 +32,7 @@ #include "runtime/thread_context.h" #include "vec/columns/column.h" #include "vec/columns/column_nullable.h" +#include "vec/core/block.h" #include "vec/core/block_spill_reader.h" #include "vec/core/block_spill_writer.h" #include "vec/core/column_with_type_and_name.h" @@ -39,6 +40,7 @@ #include "vec/data_types/data_type.h" #include "vec/data_types/data_type_nullable.h" #include "vec/exprs/vexpr_context.h" +#include "vec/utils/util.hpp" namespace doris { class RowDescriptor; @@ -160,9 +162,8 @@ Status MergeSorterState::_merge_sort_read_not_spilled(int batch_size, doris::vectorized::Block* block, bool* eos) { size_t num_columns = sorted_blocks_[0].columns(); - bool mem_reuse = block->mem_reuse(); - MutableColumns merged_columns = - mem_reuse ? block->mutate_columns() : sorted_blocks_[0].clone_empty_columns(); + MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block(block, sorted_blocks_[0]); + MutableColumns& merged_columns = m_block.mutable_columns(); /// Take rows from queue in right order and push to 'merged'. size_t merged_rows = 0; @@ -191,11 +192,6 @@ Status MergeSorterState::_merge_sort_read_not_spilled(int batch_size, return Status::OK(); } - if (!mem_reuse) { - Block merge_block = sorted_blocks_[0].clone_with_columns(std::move(merged_columns)); - merge_block.swap(*block); - } - return Status::OK(); } diff --git a/be/src/vec/exec/vrepeat_node.cpp b/be/src/vec/exec/vrepeat_node.cpp index d7499de803..58c993c8b3 100644 --- a/be/src/vec/exec/vrepeat_node.cpp +++ b/be/src/vec/exec/vrepeat_node.cpp @@ -38,11 +38,13 @@ #include "vec/columns/column_nullable.h" #include "vec/columns/column_vector.h" #include "vec/common/assert_cast.h" +#include "vec/core/block.h" #include "vec/core/column_with_type_and_name.h" #include "vec/core/types.h" #include "vec/data_types/data_type.h" #include "vec/exprs/vexpr.h" #include "vec/exprs/vexpr_context.h" +#include "vec/utils/util.hpp" namespace doris::vectorized { VRepeatNode::VRepeatNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) @@ -102,17 +104,10 @@ Status VRepeatNode::get_repeated_block(Block* child_block, int repeat_id_idx, Bl size_t child_column_size = child_block->columns(); size_t column_size = _output_slots.size(); - bool mem_reuse = output_block->mem_reuse(); DCHECK_LT(child_column_size, column_size); - std::vector<vectorized::MutableColumnPtr> columns(column_size); - for (size_t i = 0; i < column_size; i++) { - if (mem_reuse) { - columns[i] = std::move(*output_block->get_by_position(i).column).mutate(); - } else { - columns[i] = _output_slots[i]->get_empty_mutable_column(); - } - } - + MutableBlock m_block = + VectorizedUtils::build_mutable_mem_reuse_block(output_block, _output_slots); + MutableColumns& columns = m_block.mutable_columns(); /* Fill all slots according to child, for example:select tc1,tc2,sum(tc3) from t1 group by grouping sets((tc1),(tc2)); * insert into t1 values(1,2,1),(1,3,1),(2,1,1),(3,1,1); * slot_id_set_list=[[0],[1]],repeat_id_idx=0, @@ -173,18 +168,6 @@ Status VRepeatNode::get_repeated_block(Block* child_block, int repeat_id_idx, Bl DCHECK_EQ(cur_col, column_size); - if (!columns.empty() && !columns[0]->empty()) { - auto n_columns = 0; - if (!mem_reuse) { - for (const auto slot_desc : _output_slots) { - output_block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]), - slot_desc->get_data_type_ptr(), - slot_desc->col_name())); - } - } else { - columns.clear(); - } - } return Status::OK(); } diff --git a/be/src/vec/exec/vtable_function_node.cpp b/be/src/vec/exec/vtable_function_node.cpp index d3e967afcc..81fdef4389 100644 --- a/be/src/vec/exec/vtable_function_node.cpp +++ b/be/src/vec/exec/vtable_function_node.cpp @@ -29,10 +29,13 @@ #include <string> #include <utility> +#include "vec/columns/column.h" +#include "vec/core/block.h" #include "vec/exprs/table_function/table_function.h" #include "vec/exprs/table_function/table_function_factory.h" #include "vec/exprs/vexpr.h" #include "vec/exprs/vexpr_context.h" +#include "vec/utils/util.hpp" namespace doris { class ObjectPool; @@ -153,18 +156,9 @@ Status VTableFunctionNode::get_next(RuntimeState* state, Block* block, bool* eos Status VTableFunctionNode::_get_expanded_block(RuntimeState* state, Block* output_block, bool* eos) { - size_t column_size = _output_slots.size(); - bool mem_reuse = output_block->mem_reuse(); - - std::vector<MutableColumnPtr> columns(column_size); - for (size_t i = 0; i < column_size; i++) { - if (mem_reuse) { - columns[i] = std::move(*output_block->get_by_position(i).column).mutate(); - } else { - columns[i] = _output_slots[i]->get_empty_mutable_column(); - } - } - + MutableBlock m_block = + VectorizedUtils::build_mutable_mem_reuse_block(output_block, _output_slots); + MutableColumns& columns = m_block.mutable_columns(); for (int i = 0; i < _fn_num; i++) { if (columns[i + _child_slots.size()]->is_nullable()) { _fns[i]->set_nullable(); @@ -222,19 +216,6 @@ Status VTableFunctionNode::_get_expanded_block(RuntimeState* state, Block* outpu columns[index]->insert_many_defaults(row_size - columns[index]->size()); } - if (!columns.empty() && !columns[0]->empty()) { - auto n_columns = 0; - if (!mem_reuse) { - for (const auto slot_desc : _output_slots) { - output_block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]), - slot_desc->get_data_type_ptr(), - slot_desc->col_name())); - } - } else { - columns.clear(); - } - } - // 3. eval conjuncts RETURN_IF_ERROR(VExprContext::filter_block(_conjuncts, output_block, output_block->columns())); diff --git a/be/src/vec/exec/vunion_node.cpp b/be/src/vec/exec/vunion_node.cpp index 7c079a3b46..e9caa494fd 100644 --- a/be/src/vec/exec/vunion_node.cpp +++ b/be/src/vec/exec/vunion_node.cpp @@ -151,11 +151,7 @@ Status VUnionNode::get_next_materialized(RuntimeState* state, Block* block) { DCHECK(!reached_limit()); DCHECK_LT(_child_idx, _children.size()); - bool mem_reuse = block->mem_reuse(); - MutableBlock mblock = - mem_reuse ? MutableBlock::build_mutable_block(block) - : MutableBlock(Block(VectorizedUtils::create_columns_with_type_and_name( - _row_descriptor))); + MutableBlock mblock = VectorizedUtils::build_mutable_mem_reuse_block(block, _row_descriptor); Block child_block; while (has_more_materialized() && mblock.rows() <= state->batch_size()) { @@ -202,10 +198,6 @@ Status VUnionNode::get_next_materialized(RuntimeState* state, Block* block) { } } - if (!mem_reuse) { - block->swap(mblock.to_block()); - } - DCHECK_LE(_child_idx, _children.size()); return Status::OK(); } @@ -214,11 +206,7 @@ Status VUnionNode::get_next_const(RuntimeState* state, Block* block) { DCHECK_EQ(state->per_fragment_instance_idx(), 0); DCHECK_LT(_const_expr_list_idx, _const_expr_lists.size()); - bool mem_reuse = block->mem_reuse(); - MutableBlock mblock = - mem_reuse ? MutableBlock::build_mutable_block(block) - : MutableBlock(Block(VectorizedUtils::create_columns_with_type_and_name( - _row_descriptor))); + MutableBlock mblock = VectorizedUtils::build_mutable_mem_reuse_block(block, _row_descriptor); for (; _const_expr_list_idx < _const_expr_lists.size() && mblock.rows() <= state->batch_size(); ++_const_expr_list_idx) { Block tmp_block; @@ -237,10 +225,6 @@ Status VUnionNode::get_next_const(RuntimeState* state, Block* block) { } } - if (!mem_reuse) { - block->swap(mblock.to_block()); - } - // some insert query like "insert into string_test select 1, repeat('a', 1024 * 1024);" // the const expr will be in output expr cause the union node return a empty block. so here we // need add one row to make sure the union node exec const expr return at least one row @@ -257,19 +241,12 @@ Status VUnionNode::materialize_child_block(RuntimeState* state, int child_id, vectorized::Block* output_block) { DCHECK_LT(child_id, _children.size()); DCHECK(!is_child_passthrough(child_id)); - bool mem_reuse = output_block->mem_reuse(); - MutableBlock mblock = - mem_reuse ? MutableBlock::build_mutable_block(output_block) - : MutableBlock(Block(VectorizedUtils::create_columns_with_type_and_name( - _row_descriptor))); - if (input_block->rows() > 0) { + MutableBlock mblock = + VectorizedUtils::build_mutable_mem_reuse_block(output_block, _row_descriptor); Block res; RETURN_IF_ERROR(materialize_block(input_block, child_id, &res)); RETURN_IF_ERROR(mblock.merge(res)); - if (!mem_reuse) { - output_block->swap(mblock.to_block()); - } } return Status::OK(); } diff --git a/be/src/vec/runtime/vsorted_run_merger.cpp b/be/src/vec/runtime/vsorted_run_merger.cpp index be7397ee0a..c45004081b 100644 --- a/be/src/vec/runtime/vsorted_run_merger.cpp +++ b/be/src/vec/runtime/vsorted_run_merger.cpp @@ -24,6 +24,7 @@ #include "util/stopwatch.hpp" #include "vec/columns/column.h" #include "vec/core/column_with_type_and_name.h" +#include "vec/utils/util.hpp" namespace doris { namespace vectorized { @@ -129,9 +130,9 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { } } else { size_t num_columns = _empty_block.columns(); - bool mem_reuse = output_block->mem_reuse(); - MutableColumns merged_columns = - mem_reuse ? output_block->mutate_columns() : _empty_block.clone_empty_columns(); + MutableBlock m_block = + VectorizedUtils::build_mutable_mem_reuse_block(output_block, _empty_block); + MutableColumns& merged_columns = m_block.mutable_columns(); /// Take rows from queue in right order and push to 'merged'. size_t merged_rows = 0; @@ -154,11 +155,6 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { *eos = true; return Status::OK(); } - - if (!mem_reuse) { - Block merge_block = _empty_block.clone_with_columns(std::move(merged_columns)); - merge_block.swap(*output_block); - } } _num_rows_returned += output_block->rows(); diff --git a/be/src/vec/utils/util.hpp b/be/src/vec/utils/util.hpp index ba593c60cb..a57e5c1705 100644 --- a/be/src/vec/utils/util.hpp +++ b/be/src/vec/utils/util.hpp @@ -22,6 +22,7 @@ #include <boost/shared_ptr.hpp> #include "runtime/descriptors.h" +#include "vec/columns/column.h" #include "vec/columns/column_nullable.h" #include "vec/core/block.h" #include "vec/exprs/vexpr.h" @@ -34,7 +35,37 @@ public: // Block block; return create_columns_with_type_and_name(row_desc); } - + static MutableBlock build_mutable_mem_reuse_block(Block* block, const RowDescriptor& row_desc) { + if (!block->mem_reuse()) { + MutableBlock tmp(VectorizedUtils::create_columns_with_type_and_name(row_desc)); + block->swap(tmp.to_block()); + } + return MutableBlock::build_mutable_block(block); + } + static MutableBlock build_mutable_mem_reuse_block(Block* block, const Block& other) { + if (!block->mem_reuse()) { + MutableBlock tmp(other.clone_empty()); + block->swap(tmp.to_block()); + } + return MutableBlock::build_mutable_block(block); + } + static MutableBlock build_mutable_mem_reuse_block(Block* block, + std::vector<SlotDescriptor*>& slots) { + if (!block->mem_reuse()) { + size_t column_size = slots.size(); + MutableColumns columns(column_size); + for (size_t i = 0; i < column_size; i++) { + columns[i] = slots[i]->get_empty_mutable_column(); + } + int n_columns = 0; + for (const auto slot_desc : slots) { + block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]), + slot_desc->get_data_type_ptr(), + slot_desc->col_name())); + } + } + return MutableBlock(block); + } static ColumnsWithTypeAndName create_columns_with_type_and_name( const RowDescriptor& row_desc, bool ignore_trivial_slot = true) { ColumnsWithTypeAndName columns_with_type_and_name; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org