This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 804586b34279caecec1182773d9cbb695555781a Author: Pxl <pxl...@qq.com> AuthorDate: Tue May 7 10:34:04 2024 +0800 [Improvement](sort) insert data by batch on VSortedRunMerger::get_next (#34363) insert data by batch on VSortedRunMerger::get_next --- be/src/vec/columns/column.cpp | 7 ++++ be/src/vec/columns/column.h | 3 ++ be/src/vec/common/sort/partition_sorter.cpp | 2 +- be/src/vec/common/sort/sorter.cpp | 2 +- be/src/vec/core/sort_cursor.h | 4 +- .../data_types/serde/data_type_nullable_serde.cpp | 5 +-- be/src/vec/runtime/vsorted_run_merger.cpp | 45 +++++++++++++--------- be/src/vec/runtime/vsorted_run_merger.h | 4 ++ 8 files changed, 47 insertions(+), 25 deletions(-) diff --git a/be/src/vec/columns/column.cpp b/be/src/vec/columns/column.cpp index 9156428b893..85e36d163e4 100644 --- a/be/src/vec/columns/column.cpp +++ b/be/src/vec/columns/column.cpp @@ -46,6 +46,13 @@ void IColumn::insert_from(const IColumn& src, size_t n) { insert(src[n]); } +void IColumn::insert_from_multi_column(const std::vector<const IColumn*>& srcs, + std::vector<size_t> positions) { + for (size_t i = 0; i < srcs.size(); ++i) { + insert_from(*srcs[i], positions[i]); + } +} + void IColumn::sort_column(const ColumnSorter* sorter, EqualFlags& flags, IColumn::Permutation& perms, EqualRange& range, bool last_column) const { sorter->sort_column(static_cast<const IColumn&>(*this), flags, perms, range, last_column); diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h index 950eb53cde4..4a889a0e5d0 100644 --- a/be/src/vec/columns/column.h +++ b/be/src/vec/columns/column.h @@ -241,6 +241,9 @@ public: } } + virtual void insert_from_multi_column(const std::vector<const IColumn*>& srcs, + std::vector<size_t> positions); + /// Appends a batch elements from other column with the same type /// indices_begin + indices_end represent the row indices of column src virtual void insert_indices_from(const IColumn& src, const uint32_t* indices_begin, diff --git a/be/src/vec/common/sort/partition_sorter.cpp b/be/src/vec/common/sort/partition_sorter.cpp index a03646a7e9a..1ea7c6de6a8 100644 --- a/be/src/vec/common/sort/partition_sorter.cpp +++ b/be/src/vec/common/sort/partition_sorter.cpp @@ -189,7 +189,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int break; } - if (!current->isLast()) { + if (!current->is_last()) { current->next(); priority_queue.push(current); } diff --git a/be/src/vec/common/sort/sorter.cpp b/be/src/vec/common/sort/sorter.cpp index db3cca8bf09..cfbd3cb41c8 100644 --- a/be/src/vec/common/sort/sorter.cpp +++ b/be/src/vec/common/sort/sorter.cpp @@ -128,7 +128,7 @@ Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized offset_--; } - if (!current->isLast()) { + if (!current->is_last()) { current->next(); priority_queue_.push(current); } diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h index 368c8fd5e42..e565819c9d6 100644 --- a/be/src/vec/core/sort_cursor.h +++ b/be/src/vec/core/sort_cursor.h @@ -163,8 +163,8 @@ struct MergeSortCursorImpl { rows = all_columns[0]->size(); } - bool isFirst() const { return pos == 0; } - bool isLast() const { return pos + 1 >= rows; } + bool is_first() const { return pos == 0; } + bool is_last() const { return pos + 1 >= rows; } void next() { ++pos; } virtual bool has_next_block() { return false; } diff --git a/be/src/vec/data_types/serde/data_type_nullable_serde.cpp b/be/src/vec/data_types/serde/data_type_nullable_serde.cpp index fa8f9580f79..1393913b5c4 100644 --- a/be/src/vec/data_types/serde/data_type_nullable_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_nullable_serde.cpp @@ -288,15 +288,14 @@ template <bool is_binary_format> Status DataTypeNullableSerDe::_write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result, int row_idx, bool col_const) const { - auto& col = assert_cast<const ColumnNullable&>(column); - auto& nested_col = col.get_nested_column(); - col_const = col_const || is_column_const(nested_col); + const auto& col = assert_cast<const ColumnNullable&>(column); const auto col_index = index_check_const(row_idx, col_const); if (col.has_null() && col.is_null_at(col_index)) { if (UNLIKELY(0 != result.push_null())) { return Status::InternalError("pack mysql buffer failed."); } } else { + const auto& nested_col = col.get_nested_column(); RETURN_IF_ERROR( nested_serde->write_column_to_mysql(nested_col, result, col_index, col_const)); } diff --git a/be/src/vec/runtime/vsorted_run_merger.cpp b/be/src/vec/runtime/vsorted_run_merger.cpp index 3637bd54aed..3b17f957deb 100644 --- a/be/src/vec/runtime/vsorted_run_merger.cpp +++ b/be/src/vec/runtime/vsorted_run_merger.cpp @@ -131,7 +131,7 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { } } - if (current->isFirst()) { + if (current->is_first()) { if (current->block_ptr() != nullptr) { current->block_ptr()->swap(*output_block); if (_pipeline_engine_enabled) { @@ -174,30 +174,42 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { num_columns, merged_columns.size()); } + _indexs.reserve(_batch_size); + _block_addrs.reserve(_batch_size); + + auto do_insert = [&]() { + _column_addrs.resize(_indexs.size()); + for (size_t i = 0; i < num_columns; ++i) { + for (size_t j = 0; j < _indexs.size(); j++) { + _column_addrs[j] = _block_addrs[j]->get_by_position(i).column.get(); + } + merged_columns[i]->insert_from_multi_column(_column_addrs, _indexs); + } + _indexs.clear(); + _block_addrs.clear(); + _column_addrs.clear(); + }; + /// Take rows from queue in right order and push to 'merged'. size_t merged_rows = 0; - while (!_priority_queue.empty()) { + while (merged_rows != _batch_size && !_priority_queue.empty()) { auto current = _priority_queue.top(); _priority_queue.pop(); if (_offset > 0) { _offset--; } else { - for (size_t i = 0; i < num_columns; ++i) { - merged_columns[i]->insert_from(*current->all_columns[i], current->pos); - } + _indexs.emplace_back(current->pos); + _block_addrs.emplace_back(current->block_ptr()); ++merged_rows; } - // In pipeline engine, needs to check if the sender is readable before the next reading. if (!next_heap(current)) { + do_insert(); return Status::OK(); } - - if (merged_rows == _batch_size) { - break; - } } + do_insert(); output_block->set_columns(std::move(merged_columns)); if (merged_rows == 0) { @@ -215,17 +227,14 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { } bool VSortedRunMerger::next_heap(MergeSortCursor& current) { - if (!current->isLast()) { + if (!current->is_last()) { current->next(); _priority_queue.push(current); - } else if (_pipeline_engine_enabled) { - // need to check sender is readable again before the next reading. - _pending_cursor = current.impl; - return false; - } else if (has_next_block(current)) { - _priority_queue.push(current); + return true; } - return true; + + _pending_cursor = current.impl; + return false; } inline bool VSortedRunMerger::has_next_block(doris::vectorized::MergeSortCursor& current) { diff --git a/be/src/vec/runtime/vsorted_run_merger.h b/be/src/vec/runtime/vsorted_run_merger.h index 00fe44e7d69..943956d8c38 100644 --- a/be/src/vec/runtime/vsorted_run_merger.h +++ b/be/src/vec/runtime/vsorted_run_merger.h @@ -93,6 +93,10 @@ protected: // Times calls to get the next batch of rows from the input run. RuntimeProfile::Counter* _get_next_block_timer = nullptr; + std::vector<size_t> _indexs; + std::vector<Block*> _block_addrs; + std::vector<const IColumn*> _column_addrs; + private: void init_timers(RuntimeProfile* profile); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org