This is an automated email from the ASF dual-hosted git repository. mrhhsg pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
commit 5986b3cfc6a7c537e58ed08831a926b5619b79cd Author: yiguolei <676222...@qq.com> AuthorDate: Thu Sep 12 14:07:15 2024 +0800 add reserve memory logic in scan operator (#40719) ## Proposed changes Issue Number: close #xxx <!--Describe your changes.--> Co-authored-by: yiguolei <yiguo...@gmail.com> --- be/src/pipeline/exec/scan_operator.cpp | 25 +++++++++++++++++++++++++ be/src/pipeline/exec/scan_operator.h | 2 ++ be/src/pipeline/pipeline_task.cpp | 2 ++ be/src/runtime/query_context.h | 14 ++++++++------ be/src/vec/exec/scan/scanner_context.h | 9 ++++----- be/src/vec/exec/scan/scanner_scheduler.cpp | 9 ++++++++- 6 files changed, 49 insertions(+), 12 deletions(-) diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 0c0cfb18c77..b69b10461d6 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -1305,6 +1305,31 @@ Status ScanOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized: return Status::OK(); } +template <typename LocalStateType> +size_t ScanOperatorX<LocalStateType>::get_reserve_mem_size(RuntimeState* state) { + auto& local_state = get_local_state(state); + if (local_state.low_memory_mode()) { + return local_state._scanner_ctx->low_memory_mode_scan_bytes_per_scanner() * + local_state._scanner_ctx->low_memory_mode_scanners(); + } else { + if (local_state._scanner_peak_memory_usage->value() > 0) { + // It is only a safty check, to avoid some counter not right. + if (local_state._scanner_peak_memory_usage->value() > + local_state._scanner_ctx->block_memory_usage()) { + return local_state._scanner_peak_memory_usage->value() - + local_state._scanner_ctx->block_memory_usage(); + } else { + return config::doris_scanner_row_bytes; + } + } else { + // If the scan operator is first time to run, then we think it will occupy doris_scanner_row_bytes. + // It maybe a little smaller than actual usage. + return config::doris_scanner_row_bytes; + // return local_state._scanner_ctx->max_bytes_in_queue(); + } + } +} + template class ScanOperatorX<OlapScanLocalState>; template class ScanLocalState<OlapScanLocalState>; template class ScanOperatorX<JDBCScanLocalState>; diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index fed1e4015d8..e228b76caa9 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -366,6 +366,8 @@ public: [[nodiscard]] virtual bool is_file_scan_operator() const { return false; } + [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state) override; + const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override { return _runtime_filter_descs; } diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index ed8c04a5e70..e4e82bb2207 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -362,6 +362,8 @@ Status PipelineTask::execute(bool* eos) { }); DEFER_RELEASE_RESERVED(); + // Every loop should check if memory is not enough. + _state->get_query_ctx()->update_low_memory_mode(); // `_dry_run` means sink operator need no more data // `_sink->is_finished(_state)` means sink operator should be finished diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index fdbfc7c7217..0a19cd61f94 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -270,9 +270,9 @@ public: // 3. the process reached soft mem_limit, if not release these, if not release buffer, the query will be cancelled. // 4. If the query reserve memory failed. // Under low memory mode, the query should release some buffers such as scan operator block queue, union operator queue, exchange buffer size, streaming agg - bool low_memory_mode() { + void update_low_memory_mode() { if (_low_memory_mode) { - return true; + return; } // If less than 100MB left, then it is low memory mode @@ -280,7 +280,7 @@ public: _low_memory_mode = true; LOG(INFO) << "Query " << print_id(_query_id) << " goes to low memory mode due to exceed process soft memory limit"; - return true; + return; } if (_workload_group) { @@ -292,7 +292,7 @@ public: << "Query " << print_id(_query_id) << " goes to low memory mode due to workload group high water mark reached"; _low_memory_mode = true; - return true; + return; } if (is_low_wartermark && @@ -303,15 +303,17 @@ public: << " goes to low memory mode due to workload group low water mark " "reached and the query enable spill"; _low_memory_mode = true; - return true; + return; } } - return _low_memory_mode; + return; } void set_low_memory_mode() { _low_memory_mode = true; } + bool low_memory_mode() { return _low_memory_mode; } + private: int _timeout_second; TUniqueId _query_id; diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index cb9bd8e71b0..200c36cdc98 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -124,6 +124,8 @@ public: void return_free_block(vectorized::BlockUPtr block); inline void inc_block_usage(size_t usage) { _block_memory_usage += usage; } + int64_t block_memory_usage() { return _block_memory_usage; } + // Caller should make sure the pipeline task is still running when calling this function void update_peak_running_scanner(int num); // Caller should make sure the pipeline task is still running when calling this function @@ -172,12 +174,12 @@ public: int batch_size() const { return _batch_size; } // During low memory mode, there will be at most 4 scanners running and every scanner will - // cache at most 2MB data. So that every instance will keep 8MB buffer. + // cache at most 1MB data. So that every instance will keep 8MB buffer. bool low_memory_mode() const; // TODO(yiguolei) add this as session variable int32_t low_memory_mode_scan_bytes_per_scanner() const { - return 2 * 1024 * 1024; // 2MB + return 1 * 1024 * 1024; // 1MB } int32_t low_memory_mode_scanners() const { return 4; } @@ -190,9 +192,6 @@ public: bool _should_reset_thread_name = true; - const static int LOW_MEMORY_MODE_SCAN_BYTES = 2 * 1024 * 1024; // 2MB - const static int LOW_MEMORY_MODE_MAX_SCANNERS = 4; - protected: /// Four criteria to determine whether to increase the parallelism of the scanners /// 1. It ran for at least `SCALE_UP_DURATION` ms after last scale up diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 2352f10ca0c..87eda999b41 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -271,8 +271,12 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx, size_t raw_bytes_read = 0; bool first_read = true; + // If the first block is full, then it is true. Or the first block + second block > batch_size + bool has_first_full_block = false; - while (!eos && raw_bytes_read < raw_bytes_threshold) { + // During low memory mode, every scan task will return at most 2 block to reduce memory usage. + while (!eos && raw_bytes_read < raw_bytes_threshold && + !(ctx->low_memory_mode() && has_first_full_block)) { if (UNLIKELY(ctx->done())) { eos = true; break; @@ -318,6 +322,9 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx, ctx->inc_block_usage(scan_task->cached_blocks.back().first->allocated_bytes() - block_size); } else { + if (!scan_task->cached_blocks.empty()) { + has_first_full_block = true; + } ctx->inc_block_usage(free_block->allocated_bytes()); scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org