This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a1402da070 [opt](scanner) use buffered queue to avoid acquiring locks 
frequently (#29938)
9a1402da070 is described below

commit 9a1402da070bd8833715b0099af06b929803532b
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Mon Jan 15 10:28:20 2024 +0800

    [opt](scanner) use buffered queue to avoid acquiring locks frequently 
(#29938)
---
 be/src/vec/exec/scan/pip_scanner_context.h | 85 +++++++++++++++++++++---------
 1 file changed, 60 insertions(+), 25 deletions(-)

diff --git a/be/src/vec/exec/scan/pip_scanner_context.h 
b/be/src/vec/exec/scan/pip_scanner_context.h
index 8f79e3021a5..77237bb71af 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -289,42 +289,51 @@ public:
                                          dependency) {}
     Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* 
block, bool* eos,
                                 int id, bool wait = false) override {
-        std::unique_lock l(_transfer_lock);
-        if (state->is_cancelled()) {
-            set_status_on_error(Status::Cancelled("cancelled"), false);
-        }
+        if (_blocks_queue_buffered.empty()) {
+            std::unique_lock l(_transfer_lock);
+            if (state->is_cancelled()) {
+                set_status_on_error(Status::Cancelled("cancelled"), false);
+            }
 
-        if (!status().ok()) {
-            return _process_status;
-        }
+            if (!status().ok()) {
+                return _process_status;
+            }
 
-        std::vector<vectorized::BlockUPtr> merge_blocks;
-        if (_blocks_queue.empty()) {
-            *eos = done();
-            return Status::OK();
-        }
-        if (_process_status.is<ErrorCode::CANCELLED>()) {
-            *eos = true;
-            return Status::OK();
+            if (_blocks_queue.empty()) {
+                *eos = done();
+                return Status::OK();
+            }
+            if (_process_status.is<ErrorCode::CANCELLED>()) {
+                *eos = true;
+                return Status::OK();
+            }
+
+            _blocks_queue_buffered = std::move(_blocks_queue);
         }
-        *block = std::move(_blocks_queue.front());
-        _blocks_queue.pop_front();
+        *block = std::move(_blocks_queue_buffered.front());
+        _blocks_queue_buffered.pop_front();
 
+        std::vector<vectorized::BlockUPtr> merge_blocks;
         auto rows = (*block)->rows();
-        while (!_blocks_queue.empty()) {
-            const auto add_rows = (*_blocks_queue.front()).rows();
+        while (!_blocks_queue_buffered.empty()) {
+            const auto add_rows = (*_blocks_queue_buffered.front()).rows();
             if (rows + add_rows < state->batch_size()) {
                 rows += add_rows;
-                merge_blocks.emplace_back(std::move(_blocks_queue.front()));
-                _blocks_queue.pop_front();
+                
merge_blocks.emplace_back(std::move(_blocks_queue_buffered.front()));
+                _blocks_queue_buffered.pop_front();
             } else {
                 break;
             }
         }
 
-        if (_blocks_queue.empty()) {
-            this->reschedule_scanner_ctx();
-            _dependency->block();
+        if (_blocks_queue_buffered.empty()) {
+            std::unique_lock l(_transfer_lock);
+            if (_blocks_queue.empty()) {
+                this->reschedule_scanner_ctx();
+                _dependency->block();
+            } else {
+                _blocks_queue_buffered = std::move(_blocks_queue);
+            }
         }
 
         _cur_bytes_in_queue -= (*block)->allocated_bytes();
@@ -333,10 +342,13 @@ public:
             for (auto& merge_block : merge_blocks) {
                 _cur_bytes_in_queue -= merge_block->allocated_bytes();
                 static_cast<void>(m.merge(*merge_block));
-                return_free_block(std::move(merge_block));
+                if (merge_block->mem_reuse()) {
+                    _free_blocks_buffered.emplace_back(std::move(merge_block));
+                }
             }
             (*block)->set_columns(std::move(m.mutable_columns()));
         }
+        return_free_blocks();
 
         return Status::OK();
     }
@@ -353,6 +365,29 @@ public:
             set_status_on_error(state, false);
         }
     }
+
+private:
+    void return_free_blocks() {
+        if (_free_blocks_buffered.empty()) {
+            return;
+        }
+
+        size_t total_bytes = 0;
+        for (auto& block : _free_blocks_buffered) {
+            const auto bytes = block->allocated_bytes();
+            block->clear_column_data();
+            _estimated_block_bytes = std::max(bytes, (size_t)16);
+            total_bytes += bytes;
+        }
+        _free_blocks_memory_usage->add(total_bytes);
+        const auto count = _free_blocks_buffered.size();
+        
_free_blocks.enqueue_bulk(std::make_move_iterator(_free_blocks_buffered.begin()),
 count);
+        _free_blocks_buffered.clear();
+        _serving_blocks_num -= count;
+    }
+
+    std::vector<vectorized::BlockUPtr> _free_blocks_buffered;
+    std::list<vectorized::BlockUPtr> _blocks_queue_buffered;
 };
 
 } // namespace doris::pipeline


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to