This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
commit 804586b34279caecec1182773d9cbb695555781a
Author: Pxl <pxl...@qq.com>
AuthorDate: Tue May 7 10:34:04 2024 +0800

    [Improvement](sort) insert data by batch on VSortedRunMerger::get_next 
(#34363)
    
    insert data by batch on VSortedRunMerger::get_next
---
 be/src/vec/columns/column.cpp                      |  7 ++++
 be/src/vec/columns/column.h                        |  3 ++
 be/src/vec/common/sort/partition_sorter.cpp        |  2 +-
 be/src/vec/common/sort/sorter.cpp                  |  2 +-
 be/src/vec/core/sort_cursor.h                      |  4 +-
 .../data_types/serde/data_type_nullable_serde.cpp  |  5 +--
 be/src/vec/runtime/vsorted_run_merger.cpp          | 45 +++++++++++++---------
 be/src/vec/runtime/vsorted_run_merger.h            |  4 ++
 8 files changed, 47 insertions(+), 25 deletions(-)

diff --git a/be/src/vec/columns/column.cpp b/be/src/vec/columns/column.cpp
index 9156428b893..85e36d163e4 100644
--- a/be/src/vec/columns/column.cpp
+++ b/be/src/vec/columns/column.cpp
@@ -46,6 +46,13 @@ void IColumn::insert_from(const IColumn& src, size_t n) {
     insert(src[n]);
 }
 
+void IColumn::insert_from_multi_column(const std::vector<const IColumn*>& srcs,
+                                       std::vector<size_t> positions) {
+    for (size_t i = 0; i < srcs.size(); ++i) {
+        insert_from(*srcs[i], positions[i]);
+    }
+}
+
 void IColumn::sort_column(const ColumnSorter* sorter, EqualFlags& flags,
                           IColumn::Permutation& perms, EqualRange& range, bool 
last_column) const {
     sorter->sort_column(static_cast<const IColumn&>(*this), flags, perms, 
range, last_column);
diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index 950eb53cde4..4a889a0e5d0 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -241,6 +241,9 @@ public:
         }
     }
 
+    virtual void insert_from_multi_column(const std::vector<const IColumn*>& 
srcs,
+                                          std::vector<size_t> positions);
+
     /// Appends a batch elements from other column with the same type
     /// indices_begin + indices_end represent the row indices of column src
     virtual void insert_indices_from(const IColumn& src, const uint32_t* 
indices_begin,
diff --git a/be/src/vec/common/sort/partition_sorter.cpp 
b/be/src/vec/common/sort/partition_sorter.cpp
index a03646a7e9a..1ea7c6de6a8 100644
--- a/be/src/vec/common/sort/partition_sorter.cpp
+++ b/be/src/vec/common/sort/partition_sorter.cpp
@@ -189,7 +189,7 @@ Status PartitionSorter::partition_sort_read(Block* 
output_block, bool* eos, int
             break;
         }
 
-        if (!current->isLast()) {
+        if (!current->is_last()) {
             current->next();
             priority_queue.push(current);
         }
diff --git a/be/src/vec/common/sort/sorter.cpp 
b/be/src/vec/common/sort/sorter.cpp
index db3cca8bf09..cfbd3cb41c8 100644
--- a/be/src/vec/common/sort/sorter.cpp
+++ b/be/src/vec/common/sort/sorter.cpp
@@ -128,7 +128,7 @@ Status MergeSorterState::_merge_sort_read_impl(int 
batch_size, doris::vectorized
             offset_--;
         }
 
-        if (!current->isLast()) {
+        if (!current->is_last()) {
             current->next();
             priority_queue_.push(current);
         }
diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h
index 368c8fd5e42..e565819c9d6 100644
--- a/be/src/vec/core/sort_cursor.h
+++ b/be/src/vec/core/sort_cursor.h
@@ -163,8 +163,8 @@ struct MergeSortCursorImpl {
         rows = all_columns[0]->size();
     }
 
-    bool isFirst() const { return pos == 0; }
-    bool isLast() const { return pos + 1 >= rows; }
+    bool is_first() const { return pos == 0; }
+    bool is_last() const { return pos + 1 >= rows; }
     void next() { ++pos; }
 
     virtual bool has_next_block() { return false; }
diff --git a/be/src/vec/data_types/serde/data_type_nullable_serde.cpp 
b/be/src/vec/data_types/serde/data_type_nullable_serde.cpp
index fa8f9580f79..1393913b5c4 100644
--- a/be/src/vec/data_types/serde/data_type_nullable_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_nullable_serde.cpp
@@ -288,15 +288,14 @@ template <bool is_binary_format>
 Status DataTypeNullableSerDe::_write_column_to_mysql(const IColumn& column,
                                                      
MysqlRowBuffer<is_binary_format>& result,
                                                      int row_idx, bool 
col_const) const {
-    auto& col = assert_cast<const ColumnNullable&>(column);
-    auto& nested_col = col.get_nested_column();
-    col_const = col_const || is_column_const(nested_col);
+    const auto& col = assert_cast<const ColumnNullable&>(column);
     const auto col_index = index_check_const(row_idx, col_const);
     if (col.has_null() && col.is_null_at(col_index)) {
         if (UNLIKELY(0 != result.push_null())) {
             return Status::InternalError("pack mysql buffer failed.");
         }
     } else {
+        const auto& nested_col = col.get_nested_column();
         RETURN_IF_ERROR(
                 nested_serde->write_column_to_mysql(nested_col, result, 
col_index, col_const));
     }
diff --git a/be/src/vec/runtime/vsorted_run_merger.cpp 
b/be/src/vec/runtime/vsorted_run_merger.cpp
index 3637bd54aed..3b17f957deb 100644
--- a/be/src/vec/runtime/vsorted_run_merger.cpp
+++ b/be/src/vec/runtime/vsorted_run_merger.cpp
@@ -131,7 +131,7 @@ Status VSortedRunMerger::get_next(Block* output_block, 
bool* eos) {
             }
         }
 
-        if (current->isFirst()) {
+        if (current->is_first()) {
             if (current->block_ptr() != nullptr) {
                 current->block_ptr()->swap(*output_block);
                 if (_pipeline_engine_enabled) {
@@ -174,30 +174,42 @@ Status VSortedRunMerger::get_next(Block* output_block, 
bool* eos) {
                     num_columns, merged_columns.size());
         }
 
+        _indexs.reserve(_batch_size);
+        _block_addrs.reserve(_batch_size);
+
+        auto do_insert = [&]() {
+            _column_addrs.resize(_indexs.size());
+            for (size_t i = 0; i < num_columns; ++i) {
+                for (size_t j = 0; j < _indexs.size(); j++) {
+                    _column_addrs[j] = 
_block_addrs[j]->get_by_position(i).column.get();
+                }
+                merged_columns[i]->insert_from_multi_column(_column_addrs, 
_indexs);
+            }
+            _indexs.clear();
+            _block_addrs.clear();
+            _column_addrs.clear();
+        };
+
         /// Take rows from queue in right order and push to 'merged'.
         size_t merged_rows = 0;
-        while (!_priority_queue.empty()) {
+        while (merged_rows != _batch_size && !_priority_queue.empty()) {
             auto current = _priority_queue.top();
             _priority_queue.pop();
 
             if (_offset > 0) {
                 _offset--;
             } else {
-                for (size_t i = 0; i < num_columns; ++i) {
-                    merged_columns[i]->insert_from(*current->all_columns[i], 
current->pos);
-                }
+                _indexs.emplace_back(current->pos);
+                _block_addrs.emplace_back(current->block_ptr());
                 ++merged_rows;
             }
 
-            // In pipeline engine, needs to check if the sender is readable 
before the next reading.
             if (!next_heap(current)) {
+                do_insert();
                 return Status::OK();
             }
-
-            if (merged_rows == _batch_size) {
-                break;
-            }
         }
+        do_insert();
         output_block->set_columns(std::move(merged_columns));
 
         if (merged_rows == 0) {
@@ -215,17 +227,14 @@ Status VSortedRunMerger::get_next(Block* output_block, 
bool* eos) {
 }
 
 bool VSortedRunMerger::next_heap(MergeSortCursor& current) {
-    if (!current->isLast()) {
+    if (!current->is_last()) {
         current->next();
         _priority_queue.push(current);
-    } else if (_pipeline_engine_enabled) {
-        // need to check sender is readable again before the next reading.
-        _pending_cursor = current.impl;
-        return false;
-    } else if (has_next_block(current)) {
-        _priority_queue.push(current);
+        return true;
     }
-    return true;
+
+    _pending_cursor = current.impl;
+    return false;
 }
 
 inline bool 
VSortedRunMerger::has_next_block(doris::vectorized::MergeSortCursor& current) {
diff --git a/be/src/vec/runtime/vsorted_run_merger.h 
b/be/src/vec/runtime/vsorted_run_merger.h
index 00fe44e7d69..943956d8c38 100644
--- a/be/src/vec/runtime/vsorted_run_merger.h
+++ b/be/src/vec/runtime/vsorted_run_merger.h
@@ -93,6 +93,10 @@ protected:
     // Times calls to get the next batch of rows from the input run.
     RuntimeProfile::Counter* _get_next_block_timer = nullptr;
 
+    std::vector<size_t> _indexs;
+    std::vector<Block*> _block_addrs;
+    std::vector<const IColumn*> _column_addrs;
+
 private:
     void init_timers(RuntimeProfile* profile);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to