This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 9a1402da070 [opt](scanner) use buffered queue to avoid acquiring locks frequently (#29938) 9a1402da070 is described below commit 9a1402da070bd8833715b0099af06b929803532b Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Mon Jan 15 10:28:20 2024 +0800 [opt](scanner) use buffered queue to avoid acquiring locks frequently (#29938) --- be/src/vec/exec/scan/pip_scanner_context.h | 85 +++++++++++++++++++++--------- 1 file changed, 60 insertions(+), 25 deletions(-) diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index 8f79e3021a5..77237bb71af 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -289,42 +289,51 @@ public: dependency) {} Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* block, bool* eos, int id, bool wait = false) override { - std::unique_lock l(_transfer_lock); - if (state->is_cancelled()) { - set_status_on_error(Status::Cancelled("cancelled"), false); - } + if (_blocks_queue_buffered.empty()) { + std::unique_lock l(_transfer_lock); + if (state->is_cancelled()) { + set_status_on_error(Status::Cancelled("cancelled"), false); + } - if (!status().ok()) { - return _process_status; - } + if (!status().ok()) { + return _process_status; + } - std::vector<vectorized::BlockUPtr> merge_blocks; - if (_blocks_queue.empty()) { - *eos = done(); - return Status::OK(); - } - if (_process_status.is<ErrorCode::CANCELLED>()) { - *eos = true; - return Status::OK(); + if (_blocks_queue.empty()) { + *eos = done(); + return Status::OK(); + } + if (_process_status.is<ErrorCode::CANCELLED>()) { + *eos = true; + return Status::OK(); + } + + _blocks_queue_buffered = std::move(_blocks_queue); } - *block = std::move(_blocks_queue.front()); - _blocks_queue.pop_front(); + *block = std::move(_blocks_queue_buffered.front()); + _blocks_queue_buffered.pop_front(); + std::vector<vectorized::BlockUPtr> merge_blocks; auto rows = (*block)->rows(); - while (!_blocks_queue.empty()) { - const auto add_rows = (*_blocks_queue.front()).rows(); + while (!_blocks_queue_buffered.empty()) { + const auto add_rows = (*_blocks_queue_buffered.front()).rows(); if (rows + add_rows < state->batch_size()) { rows += add_rows; - merge_blocks.emplace_back(std::move(_blocks_queue.front())); - _blocks_queue.pop_front(); + merge_blocks.emplace_back(std::move(_blocks_queue_buffered.front())); + _blocks_queue_buffered.pop_front(); } else { break; } } - if (_blocks_queue.empty()) { - this->reschedule_scanner_ctx(); - _dependency->block(); + if (_blocks_queue_buffered.empty()) { + std::unique_lock l(_transfer_lock); + if (_blocks_queue.empty()) { + this->reschedule_scanner_ctx(); + _dependency->block(); + } else { + _blocks_queue_buffered = std::move(_blocks_queue); + } } _cur_bytes_in_queue -= (*block)->allocated_bytes(); @@ -333,10 +342,13 @@ public: for (auto& merge_block : merge_blocks) { _cur_bytes_in_queue -= merge_block->allocated_bytes(); static_cast<void>(m.merge(*merge_block)); - return_free_block(std::move(merge_block)); + if (merge_block->mem_reuse()) { + _free_blocks_buffered.emplace_back(std::move(merge_block)); + } } (*block)->set_columns(std::move(m.mutable_columns())); } + return_free_blocks(); return Status::OK(); } @@ -353,6 +365,29 @@ public: set_status_on_error(state, false); } } + +private: + void return_free_blocks() { + if (_free_blocks_buffered.empty()) { + return; + } + + size_t total_bytes = 0; + for (auto& block : _free_blocks_buffered) { + const auto bytes = block->allocated_bytes(); + block->clear_column_data(); + _estimated_block_bytes = std::max(bytes, (size_t)16); + total_bytes += bytes; + } + _free_blocks_memory_usage->add(total_bytes); + const auto count = _free_blocks_buffered.size(); + _free_blocks.enqueue_bulk(std::make_move_iterator(_free_blocks_buffered.begin()), count); + _free_blocks_buffered.clear(); + _serving_blocks_num -= count; + } + + std::vector<vectorized::BlockUPtr> _free_blocks_buffered; + std::list<vectorized::BlockUPtr> _blocks_queue_buffered; }; } // namespace doris::pipeline --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org