This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new ae50d7a614a [Scanner](revert) revert the scanner change by apache#35604 (#31508) ae50d7a614a is described below commit ae50d7a614a8c62b47aea59340054676be62cdf2 Author: HappenLee <happen...@hotmail.com> AuthorDate: Mon Jun 3 00:07:07 2024 +0800 [Scanner](revert) revert the scanner change by apache#35604 (#31508) --- be/src/vec/exec/scan/scanner_context.cpp | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index fd570c99a68..813643978d0 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -296,12 +296,12 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo // `_free_blocks` serve all running scanners, maybe it's too large for the remaining scanners int free_blocks_for_each = _free_blocks.size_approx() / _num_running_scanners; _num_running_scanners--; - std::vector<vectorized::BlockUPtr> blocks(free_blocks_for_each); - free_blocks_for_each = - _free_blocks.try_dequeue_bulk(blocks.data(), free_blocks_for_each); for (int i = 0; i < free_blocks_for_each; ++i) { - _free_blocks_memory_usage -= blocks[i]->allocated_bytes(); - _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage); + vectorized::BlockUPtr removed_block; + if (_free_blocks.try_dequeue(removed_block)) { + _free_blocks_memory_usage -= block->allocated_bytes(); + _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage); + } } } } else { @@ -350,16 +350,17 @@ void ScannerContext::_try_to_scale_up() { (_max_bytes_in_queue - _free_blocks_memory_usage) / _estimated_block_size; num_add = std::min(num_add, most_add); } - std::vector<std::weak_ptr<ScannerDelegate>> scale_up_scanners(num_add); - // get enough memory to launch one more scanner. - if (auto real_size = _scanners.try_dequeue_bulk(scale_up_scanners.data(), num_add); - real_size) { - for (int i = 0; i < real_size; ++i) { - submit_scan_task(std::make_shared<ScanTask>(scale_up_scanners[i])); + for (int i = 0; i < num_add; ++i) { + // get enough memory to launch one more scanner. + std::weak_ptr<ScannerDelegate> scale_up_scanner; + if (_scanners.try_dequeue(scale_up_scanner)) { + submit_scan_task(std::make_shared<ScanTask>(scale_up_scanner)); + _num_running_scanners++; + _scale_up_scanners_counter->update(1); + is_scale_up = true; + } else { + break; } - _num_running_scanners += real_size; - _scale_up_scanners_counter->update(real_size); - is_scale_up = true; } if (is_scale_up) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org