This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push: new f3d4f8d98b3 add low memory mode in scan operator (#40662) f3d4f8d98b3 is described below commit f3d4f8d98b37725ace0d62f9678c92f696ba56f4 Author: yiguolei <676222...@qq.com> AuthorDate: Wed Sep 11 15:04:02 2024 +0800 add low memory mode in scan operator (#40662) ## Proposed changes Issue Number: close #xxx <!--Describe your changes.--> Co-authored-by: yiguolei <yiguo...@gmail.com> --- be/src/pipeline/exec/operator.h | 2 + be/src/pipeline/pipeline_task.cpp | 1 + be/src/pipeline/task_scheduler.cpp | 3 +- be/src/runtime/query_context.h | 49 +++++++++++++++ .../workload_group/workload_group_manager.cpp | 10 +++ be/src/vec/exec/scan/scanner_context.cpp | 72 ++++++++++++++-------- be/src/vec/exec/scan/scanner_context.h | 16 ++++- be/src/vec/exec/scan/scanner_scheduler.cpp | 9 ++- 8 files changed, 134 insertions(+), 28 deletions(-) diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 265cf31f648..c6b49317375 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -198,6 +198,8 @@ public: void reset_estimate_memory_usage() { _estimate_memory_usage = 0; } + bool low_memory_mode() { return _state->get_query_ctx()->low_memory_mode(); } + protected: friend class OperatorXBase; diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index fe44f8a4b3f..8051bb5d4e6 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -388,6 +388,7 @@ Status PipelineTask::execute(bool* eos) { << ", sink name: " << _sink->get_name() << ", node id: " << _sink->node_id() << " failed: " << st.to_string() << ", debug info: " << GlobalMemoryArbitrator::process_mem_log_str(); + _state->get_query_ctx()->set_low_memory_mode(); bool is_high_wartermark = false; bool is_low_wartermark = false; workload_group->check_mem_used(&is_low_wartermark, &is_high_wartermark); diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 715feceed98..30f3302d429 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -58,9 +58,8 @@ TaskScheduler::~TaskScheduler() { Status TaskScheduler::start() { int cores = _task_queue->cores(); - // Init the thread pool with cores+1 thread + // Init the thread pool with cores thread // some for pipeline task running - // 1 for spill disk query handler RETURN_IF_ERROR(ThreadPoolBuilder(_name) .set_min_threads(cores) .set_max_threads(cores) diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index eb2beb2ba05..fdbfc7c7217 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -264,6 +264,54 @@ public: } } + // Query will run in low memory mode when + // 1. the query is enable spill and wg's low water mark reached, if not release buffer, it will trigger spill disk, it is very expensive. + // 2. the query is not enable spill, but wg's high water mark reached, if not release buffer, the query will be cancelled. + // 3. the process reached soft mem_limit, if not release these, if not release buffer, the query will be cancelled. + // 4. If the query reserve memory failed. + // Under low memory mode, the query should release some buffers such as scan operator block queue, union operator queue, exchange buffer size, streaming agg + bool low_memory_mode() { + if (_low_memory_mode) { + return true; + } + + // If less than 100MB left, then it is low memory mode + if (doris::GlobalMemoryArbitrator::is_exceed_soft_mem_limit(100 * 1024 * 1024)) { + _low_memory_mode = true; + LOG(INFO) << "Query " << print_id(_query_id) + << " goes to low memory mode due to exceed process soft memory limit"; + return true; + } + + if (_workload_group) { + bool is_low_wartermark = false; + bool is_high_wartermark = false; + _workload_group->check_mem_used(&is_low_wartermark, &is_high_wartermark); + if (is_high_wartermark) { + LOG(INFO) + << "Query " << print_id(_query_id) + << " goes to low memory mode due to workload group high water mark reached"; + _low_memory_mode = true; + return true; + } + + if (is_low_wartermark && + ((_query_options.__isset.enable_join_spill && _query_options.enable_join_spill) || + (_query_options.__isset.enable_sort_spill && _query_options.enable_sort_spill) || + (_query_options.__isset.enable_agg_spill && _query_options.enable_agg_spill))) { + LOG(INFO) << "Query " << print_id(_query_id) + << " goes to low memory mode due to workload group low water mark " + "reached and the query enable spill"; + _low_memory_mode = true; + return true; + } + } + + return _low_memory_mode; + } + + void set_low_memory_mode() { _low_memory_mode = true; } + private: int _timeout_second; TUniqueId _query_id; @@ -313,6 +361,7 @@ private: std::mutex _pipeline_map_write_lock; std::atomic<int64_t> _spill_threshold {0}; + std::atomic<bool> _low_memory_mode = false; std::mutex _profile_mutex; timespec _query_arrival_timestamp; diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 39ab564db63..431eaa248ac 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -335,6 +335,8 @@ void WorkloadGroupMgr::handle_paused_queries() { wg->check_mem_used(&is_low_wartermark, &is_high_wartermark); if (!is_low_wartermark && !is_high_wartermark) { + // TODO: should check if there is a large reserve size in the query's operators + // If it exist, then should find the query and spill it. LOG(INFO) << "**** there are " << queries_list.size() << " to resume"; for (const auto& query : queries_list) { LOG(INFO) << "**** resume paused query: " << query.query_id(); @@ -359,6 +361,14 @@ void WorkloadGroupMgr::handle_paused_queries() { size_t max_memory_usage = 0; auto it_to_remove = queries_list.end(); + // TODO: should check buffer type memory first, if could release many these memory, then not need do spill disk + // Buffer Memory are: + // 1. caches: page cache, segment cache... + // 2. memtables: load memtable + // 3. scan queue, exchange sink buffer, union queue + // 4. streaming aggs. + // If we could not recycle memory from these buffers(< 10%), then do spill disk. + for (auto query_it = queries_list.begin(); query_it != queries_list.end();) { const auto query_ctx = query_it->query_ctx_.lock(); // The query is finished during in paused list. diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index cbb3d0f5723..ed50c195d74 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -266,7 +266,10 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo update_peak_memory_usage(-current_block->allocated_bytes()); // consume current block block->swap(*current_block); - return_free_block(std::move(current_block)); + // If under low memory mode, should not return the freeblock, it will occupy too memory. + if (!_local_state->low_memory_mode()) { + return_free_block(std::move(current_block)); + } } else { // This scan task do not have any cached blocks. _blocks_queue.pop_front(); @@ -275,37 +278,54 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo _num_finished_scanners++; std::weak_ptr<ScannerDelegate> next_scanner; // submit one of the remaining scanners - if (_scanners.try_dequeue(next_scanner)) { - auto submit_status = submit_scan_task(std::make_shared<ScanTask>(next_scanner)); - if (!submit_status.ok()) { - _process_status = submit_status; - _set_scanner_done(); - return _process_status; - } - } else { - // no more scanner to be scheduled - // `_free_blocks` serve all running scanners, maybe it's too large for the remaining scanners - int free_blocks_for_each = _free_blocks.size_approx() / _num_running_scanners; + // If under low memory mode, then there should be at most 4 scanner running + if (_num_running_scanners > low_memory_mode_scanners() && + _local_state->low_memory_mode()) { _num_running_scanners--; - for (int i = 0; i < free_blocks_for_each; ++i) { - vectorized::BlockUPtr removed_block; - if (_free_blocks.try_dequeue(removed_block)) { - _block_memory_usage -= block->allocated_bytes(); + } else { + if (_scanners.try_dequeue(next_scanner)) { + auto submit_status = + submit_scan_task(std::make_shared<ScanTask>(next_scanner)); + if (!submit_status.ok()) { + _process_status = submit_status; + _set_scanner_done(); + return _process_status; + } + } else { + // no more scanner to be scheduled + // `_free_blocks` serve all running scanners, maybe it's too large for the remaining scanners + int free_blocks_for_each = + _free_blocks.size_approx() / _num_running_scanners; + _num_running_scanners--; + for (int i = 0; i < free_blocks_for_each; ++i) { + vectorized::BlockUPtr removed_block; + if (_free_blocks.try_dequeue(removed_block)) { + _block_memory_usage -= block->allocated_bytes(); + } } } } } else { - // resubmit current running scanner to read the next block - Status submit_status = submit_scan_task(scan_task); - if (!submit_status.ok()) { - _process_status = submit_status; - _set_scanner_done(); - return _process_status; + if (_local_state->low_memory_mode() && + _num_running_scanners > low_memory_mode_scanners()) { + _num_running_scanners--; + // push the scanner to the stack since it is not eos + _scanners.enqueue(scan_task->scanner); + } else { + // resubmit current running scanner to read the next block + Status submit_status = submit_scan_task(scan_task); + if (!submit_status.ok()) { + _process_status = submit_status; + _set_scanner_done(); + return _process_status; + } } } } - // scale up - RETURN_IF_ERROR(_try_to_scale_up()); + if (_local_state->low_memory_mode()) { + // scale up + RETURN_IF_ERROR(_try_to_scale_up()); + } } if (_num_finished_scanners == _all_scanners.size() && _blocks_queue.empty()) { @@ -488,4 +508,8 @@ void ScannerContext::update_peak_memory_usage(int64_t usage) { _local_state->_scanner_peak_memory_usage->add(usage); } +bool ScannerContext::low_memory_mode() const { + return _local_state->low_memory_mode(); +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index 03c4e5a4f1b..cb9bd8e71b0 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -171,6 +171,17 @@ public: int batch_size() const { return _batch_size; } + // During low memory mode, there will be at most 4 scanners running and every scanner will + // cache at most 2MB data. So that every instance will keep 8MB buffer. + bool low_memory_mode() const; + + // TODO(yiguolei) add this as session variable + int32_t low_memory_mode_scan_bytes_per_scanner() const { + return 2 * 1024 * 1024; // 2MB + } + + int32_t low_memory_mode_scanners() const { return 4; } + // the unique id of this context std::string ctx_id; TUniqueId _query_id; @@ -179,6 +190,9 @@ public: bool _should_reset_thread_name = true; + const static int LOW_MEMORY_MODE_SCAN_BYTES = 2 * 1024 * 1024; // 2MB + const static int LOW_MEMORY_MODE_MAX_SCANNERS = 4; + protected: /// Four criteria to determine whether to increase the parallelism of the scanners /// 1. It ran for at least `SCALE_UP_DURATION` ms after last scale up @@ -217,7 +231,7 @@ protected: moodycamel::ConcurrentQueue<std::weak_ptr<ScannerDelegate>> _scanners; int32_t _num_scheduled_scanners = 0; int32_t _num_finished_scanners = 0; - int32_t _num_running_scanners = 0; + std::atomic_int _num_running_scanners = 0; // weak pointer for _scanners, used in stop function std::vector<std::weak_ptr<ScannerDelegate>> _all_scanners; const int _num_parallel_instances; diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 444ff4dbb0c..2352f10ca0c 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -264,7 +264,14 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx, } size_t raw_bytes_threshold = config::doris_scanner_row_bytes; - size_t raw_bytes_read = 0; bool first_read = true; + if (ctx->low_memory_mode() && + raw_bytes_threshold > ctx->low_memory_mode_scan_bytes_per_scanner()) { + raw_bytes_threshold = ctx->low_memory_mode_scan_bytes_per_scanner(); + } + + size_t raw_bytes_read = 0; + bool first_read = true; + while (!eos && raw_bytes_read < raw_bytes_threshold) { if (UNLIKELY(ctx->done())) { eos = true; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org