This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 9a19581a2c5 [improvement](scanner_schedule) reduce memory consumption of scanner #24199 (#25547) 9a19581a2c5 is described below commit 9a19581a2c55c6e78e7e2812a45e1bc3842ffec7 Author: Mingyu Chen <morning...@163.com> AuthorDate: Fri Nov 3 19:10:30 2023 +0800 [improvement](scanner_schedule) reduce memory consumption of scanner #24199 (#25547) --- be/src/exec/exec_node.cpp | 3 +- be/src/pipeline/exec/scan_operator.cpp | 2 +- be/src/runtime/plan_fragment_executor.cpp | 1 + be/src/vec/exec/scan/pip_scanner_context.h | 7 +-- be/src/vec/exec/scan/scanner_context.cpp | 68 +++++++++++++++++------------- be/src/vec/exec/scan/scanner_context.h | 28 +++++++----- be/src/vec/exec/scan/scanner_scheduler.cpp | 12 ++---- 7 files changed, 66 insertions(+), 55 deletions(-) diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 0dc8df911b2..c9e327ad640 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -41,6 +41,7 @@ #include "runtime/runtime_state.h" #include "util/debug_util.h" #include "util/runtime_profile.h" +#include "util/stack_util.h" #include "util/uid_util.h" #include "vec/columns/column_nullable.h" #include "vec/core/block.h" @@ -205,7 +206,7 @@ Status ExecNode::close(RuntimeState* state) { << " already closed"; return Status::OK(); } - LOG(INFO) << "fragment_instance_id=" << print_id(state->fragment_instance_id()) << " closed"; + LOG(INFO) << "fragment_instance_id=" << print_id(state->fragment_instance_id()) << " closed. "; _is_closed = true; Status result; diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index f34461a9fd2..1f15b1d61f8 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -44,7 +44,7 @@ bool ScanOperator::can_read() { return true; } else { if (_node->_scanner_ctx->get_num_running_scanners() == 0 && - _node->_scanner_ctx->has_enough_space_in_blocks_queue()) { + _node->_scanner_ctx->should_be_scheduled()) { _node->_scanner_ctx->reschedule_scanner_ctx(); } return _node->ready_to_read(); // there are some blocks to process diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index df5f4b7d3e4..ff2c0b8688a 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -53,6 +53,7 @@ #include "util/container_util.hpp" #include "util/defer_op.h" #include "util/pretty_printer.h" +#include "util/stack_util.h" #include "util/telemetry/telemetry.h" #include "util/threadpool.h" #include "util/time.h" diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index b98c628368e..f52bd3bf3c7 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -166,10 +166,6 @@ public: _free_blocks_memory_usage->add(free_blocks_memory_usage); } - bool has_enough_space_in_blocks_queue() const override { - return _current_used_bytes < _max_bytes_in_queue / 2 * _num_parallel_instances; - } - void _dispose_coloate_blocks_not_in_queue() override { if (_need_colocate_distribute) { for (int i = 0; i < _num_parallel_instances; ++i) { @@ -221,8 +217,7 @@ private: std::lock_guard<std::mutex> queue_l(*_queue_mutexs[loc]); _blocks_queues[loc].emplace_back(std::move(_colocate_blocks[loc])); } - bool get_block_not_empty = true; - _colocate_blocks[loc] = get_free_block(&get_block_not_empty, get_block_not_empty); + _colocate_blocks[loc] = get_free_block(); _colocate_mutable_blocks[loc]->set_muatable_columns( _colocate_blocks[loc]->mutate_columns()); } diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 478d9fb4cb7..8deb2153478 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -52,7 +52,7 @@ ScannerContext::ScannerContext(doris::RuntimeState* state_, doris::vectorized::V _process_status(Status::OK()), _batch_size(state_->batch_size()), limit(limit_), - _max_bytes_in_queue(max_bytes_in_blocks_queue_), + _max_bytes_in_queue(max_bytes_in_blocks_queue_ * num_parallel_instances), _scanner_scheduler(state_->exec_env()->scanner_scheduler()), _scanners(scanners_), _num_parallel_instances(num_parallel_instances) { @@ -63,26 +63,21 @@ ScannerContext::ScannerContext(doris::RuntimeState* state_, doris::vectorized::V if (limit < 0) { limit = -1; } -} -// After init function call, should not access _parent -Status ScannerContext::init() { - // 1. Calculate max concurrency - // TODO: now the max thread num <= config::doris_scanner_thread_pool_thread_num / 4 - // should find a more reasonable value. _max_thread_num = config::doris_scanner_thread_pool_thread_num / 4; - if (_parent->_shared_scan_opt) { - DCHECK(_num_parallel_instances > 0); - _max_thread_num *= _num_parallel_instances; - } + _max_thread_num *= num_parallel_instances; _max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num; DCHECK(_max_thread_num > 0); _max_thread_num = std::min(_max_thread_num, (int32_t)_scanners.size()); + // 1. Calculate max concurrency // For select * from table limit 10; should just use one thread. if (_parent->should_run_serial()) { _max_thread_num = 1; } +} +// After init function call, should not access _parent +Status ScannerContext::init() { _scanner_profile = _parent->_scanner_profile; _scanner_sched_counter = _parent->_scanner_sched_counter; _scanner_ctx_sched_counter = _parent->_scanner_ctx_sched_counter; @@ -104,6 +99,9 @@ Status ScannerContext::init() { limit == -1 ? _batch_size : std::min(static_cast<int64_t>(_batch_size), limit); _block_per_scanner = (doris_scanner_row_num + (real_block_size - 1)) / real_block_size; _free_blocks_capacity = _max_thread_num * _block_per_scanner; + auto block = get_free_block(); + _estimated_block_bytes = std::max(block->allocated_bytes(), (size_t)16); + return_free_block(std::move(block)); #ifndef BE_TEST // 3. get thread token @@ -123,27 +121,33 @@ Status ScannerContext::init() { return Status::OK(); } -vectorized::BlockUPtr ScannerContext::get_free_block(bool* has_free_block, - bool get_block_not_empty) { +vectorized::BlockUPtr ScannerContext::get_free_block() { vectorized::BlockUPtr block; if (_free_blocks.try_dequeue(block)) { - if (!get_block_not_empty || block->mem_reuse()) { - _free_blocks_capacity--; - _free_blocks_memory_usage->add(-block->allocated_bytes()); - return block; - } + DCHECK(block->mem_reuse()); + _free_blocks_memory_usage->add(-block->allocated_bytes()); + _serving_blocks_num++; + return block; } + block = vectorized::Block::create_unique(_output_tuple_desc->slots(), _batch_size, + true /*ignore invalid slots*/); COUNTER_UPDATE(_newly_create_free_blocks_num, 1); - return vectorized::Block::create_unique(_output_tuple_desc->slots(), _batch_size, - true /*ignore invalid slots*/); + + _serving_blocks_num++; + return block; } void ScannerContext::return_free_block(std::unique_ptr<vectorized::Block> block) { - block->clear_column_data(); - _free_blocks_memory_usage->add(block->allocated_bytes()); - _free_blocks.enqueue(std::move(block)); - ++_free_blocks_capacity; + _serving_blocks_num--; + if (block->mem_reuse()) { + // Only put blocks with schema to free blocks, because colocate blocks + // need schema. + _estimated_block_bytes = std::max(block->allocated_bytes(), (size_t)16); + block->clear_column_data(); + _free_blocks_memory_usage->add(block->allocated_bytes()); + _free_blocks.enqueue(std::move(block)); + } } void ScannerContext::append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& blocks) { @@ -176,7 +180,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo // (if the scheduler continues to schedule, it will cause a lot of busy running). // At this point, consumers are required to trigger new scheduling to ensure that // data can be continuously fetched. - if (has_enough_space_in_blocks_queue() && _num_running_scanners == 0) { + if (should_be_scheduled() && _num_running_scanners == 0) { auto state = _scanner_scheduler->submit(this); if (state.ok()) { _num_scheduling_ctx++; @@ -184,6 +188,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo set_status_on_error(state, false); } } + // Wait for block from queue if (wait) { SCOPED_TIMER(_scanner_wait_batch_timer); @@ -207,6 +212,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo auto block_bytes = (*block)->allocated_bytes(); _cur_bytes_in_queue -= block_bytes; + _queued_blocks_memory_usage->add(-block_bytes); return Status::OK(); } else { @@ -353,7 +359,13 @@ void ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) { _scanners.push_front(scanner); } std::lock_guard l(_transfer_lock); - if (has_enough_space_in_blocks_queue()) { + + // In pipeline engine, doris will close scanners when `no_schedule`. + // We have to decrease _num_running_scanners before schedule, otherwise + // schedule does not woring due to _num_running_scanners. + _num_running_scanners--; + + if (should_be_scheduled()) { auto state = _scanner_scheduler->submit(this); if (state.ok()) { _num_scheduling_ctx++; @@ -373,8 +385,6 @@ void ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) { _is_finished = true; _blocks_queue_added_cv.notify_one(); } - // In pipeline engine, doris will close scanners when `no_schedule`. - _num_running_scanners--; _ctx_finish_cv.notify_one(); } @@ -384,7 +394,7 @@ void ScannerContext::get_next_batch_of_scanners(std::list<VScannerSPtr>* current { // If there are enough space in blocks queue, // the scanner number depends on the _free_blocks numbers - thread_slot_num = cal_thread_slot_num_by_free_block_num(); + thread_slot_num = get_available_thread_slot_num(); } // 2. get #thread_slot_num scanners from ctx->scanners diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index 3aad0d6263f..a345bfc03dd 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -62,12 +62,12 @@ public: ScannerContext(RuntimeState* state_, VScanNode* parent, const TupleDescriptor* output_tuple_desc, const std::list<VScannerSPtr>& scanners_, int64_t limit_, - int64_t max_bytes_in_blocks_queue_, const int num_parallel_instances = 0); + int64_t max_bytes_in_blocks_queue_, const int num_parallel_instances = 1); virtual ~ScannerContext() = default; virtual Status init(); - vectorized::BlockUPtr get_free_block(bool* has_free_block, bool get_not_empty_block = false); + vectorized::BlockUPtr get_free_block(); void return_free_block(std::unique_ptr<vectorized::Block> block); // Append blocks from scanners to the blocks queue. @@ -136,20 +136,25 @@ public: virtual bool empty_in_queue(int id); // todo(wb) rethinking how to calculate ```_max_bytes_in_queue``` when executing shared scan - virtual inline bool has_enough_space_in_blocks_queue() const { - return _cur_bytes_in_queue < _max_bytes_in_queue / 2; + inline bool should_be_scheduled() const { + return (_cur_bytes_in_queue < _max_bytes_in_queue / 2) && + (_serving_blocks_num < allowed_blocks_num()); } - int cal_thread_slot_num_by_free_block_num() { + int get_available_thread_slot_num() { int thread_slot_num = 0; - thread_slot_num = (_free_blocks_capacity + _block_per_scanner - 1) / _block_per_scanner; + thread_slot_num = (allowed_blocks_num() + _block_per_scanner - 1) / _block_per_scanner; thread_slot_num = std::min(thread_slot_num, _max_thread_num - _num_running_scanners); - if (thread_slot_num <= 0) { - thread_slot_num = 1; - } return thread_slot_num; } + int32_t allowed_blocks_num() const { + int32_t blocks_num = std::min(_free_blocks_capacity, + int32_t((_max_bytes_in_queue + _estimated_block_bytes - 1) / + _estimated_block_bytes)); + return blocks_num; + } + void reschedule_scanner_ctx(); // the unique id of this context @@ -203,10 +208,12 @@ protected: // Lazy-allocated blocks for all scanners to share, for memory reuse. moodycamel::ConcurrentQueue<vectorized::BlockUPtr> _free_blocks; + std::atomic<int32_t> _serving_blocks_num = 0; // The current number of free blocks available to the scanners. // Used to limit the memory usage of the scanner. // NOTE: this is NOT the size of `_free_blocks`. - std::atomic_int32_t _free_blocks_capacity = 0; + int32_t _free_blocks_capacity = 0; + int64_t _estimated_block_bytes = 0; int _batch_size; // The limit from SQL's limit clause @@ -231,6 +238,7 @@ protected: int64_t _cur_bytes_in_queue = 0; // The max limit bytes of blocks in blocks queue const int64_t _max_bytes_in_queue; + std::atomic<int64_t> _bytes_allocated = 0; doris::vectorized::ScannerScheduler* _scanner_scheduler; // List "scanners" saves all "unfinished" scanners. diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 3481128a1d2..2529ce67e5e 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -321,7 +321,6 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext int64_t raw_rows_threshold = raw_rows_read + config::doris_scanner_row_num; int64_t raw_bytes_read = 0; int64_t raw_bytes_threshold = config::doris_scanner_row_bytes; - bool has_free_block = true; int num_rows_in_block = 0; // Only set to true when ctx->done() return true. @@ -331,9 +330,8 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext bool should_stop = false; // Has to wait at least one full block, or it will cause a lot of schedule task in priority // queue, it will affect query latency and query concurrency for example ssb 3.3. - while (!eos && raw_bytes_read < raw_bytes_threshold && - ((raw_rows_read < raw_rows_threshold && has_free_block) || - num_rows_in_block < state->batch_size())) { + while (!eos && raw_bytes_read < raw_bytes_threshold && raw_rows_read < raw_rows_threshold && + num_rows_in_block < state->batch_size()) { if (UNLIKELY(ctx->done())) { // No need to set status on error here. // Because done() maybe caused by "should_stop" @@ -341,7 +339,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext break; } - BlockUPtr block = ctx->get_free_block(&has_free_block); + BlockUPtr block = ctx->get_free_block(); status = scanner->get_block(state, block.get(), &eos); VLOG_ROW << "VScanNode input rows: " << block->rows() << ", eos: " << eos; // The VFileScanner for external table may try to open not exist files, @@ -357,12 +355,11 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext if (status.is<ErrorCode::NOT_FOUND>()) { // The only case in this "if" branch is external table file delete and fe cache has not been updated yet. // Set status to OK. - LOG(INFO) << "scan range not found: " << scanner->get_current_scan_range_name(); status = Status::OK(); eos = true; } - raw_bytes_read += block->bytes(); + raw_bytes_read += block->allocated_bytes(); num_rows_in_block += block->rows(); if (UNLIKELY(block->rows() == 0)) { ctx->return_free_block(std::move(block)); @@ -397,7 +394,6 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext if (eos || should_stop) { scanner->mark_to_need_to_close(); } - ctx->push_back_scanner_and_reschedule(scanner); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org