This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 6e3458b79a1 [Try_Fix](scan) try fix the scanner schedule logic to prevent excessive memory usage and timeout (#30515) 6e3458b79a1 is described below commit 6e3458b79a1af6e0b18360fb12709f5d66b0e8b9 Author: HappenLee <happen...@hotmail.com> AuthorDate: Mon Jan 29 23:41:55 2024 +0800 [Try_Fix](scan) try fix the scanner schedule logic to prevent excessive memory usage and timeout (#30515) --- be/src/vec/exec/scan/pip_scanner_context.h | 14 ++++++++++++-- be/src/vec/exec/scan/scanner_context.cpp | 6 +++--- be/src/vec/exec/scan/scanner_context.h | 5 +++-- 3 files changed, 18 insertions(+), 7 deletions(-) diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index c2490a5e0d0..484fe5b4003 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -40,7 +40,7 @@ public: _need_colocate_distribute(!_col_distribute_ids.empty()) {} Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* block, bool* eos, - int id, bool wait = false) override { + int id) override { { std::unique_lock l(_transfer_lock); if (state->is_cancelled()) { @@ -97,6 +97,11 @@ public: (*block)->set_columns(std::move(m.mutable_columns())); } + // after return free blocks, should try to reschedule the scanner + if (should_be_scheduled()) { + this->reschedule_scanner_ctx(); + } + return Status::OK(); } @@ -284,7 +289,7 @@ public: limit_, max_bytes_in_blocks_queue, 1, local_state, dependency) {} Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* block, bool* eos, - int id, bool wait = false) override { + int id) override { if (_blocks_queue_buffered.empty()) { std::unique_lock l(_transfer_lock); if (state->is_cancelled()) { @@ -349,6 +354,11 @@ public: } return_free_blocks(); + // after return free blocks, should try to reschedule the scanner + if (should_be_scheduled()) { + this->reschedule_scanner_ctx(); + } + return Status::OK(); } diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 7691f159509..033709f950e 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -273,7 +273,7 @@ bool ScannerContext::empty_in_queue(int id) { } Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* block, - bool* eos, int id, bool wait) { + bool* eos, int id) { std::vector<vectorized::BlockUPtr> merge_blocks; { std::unique_lock l(_transfer_lock); @@ -295,9 +295,9 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo } // Wait for block from queue - if (wait) { - // scanner batch wait time + { SCOPED_TIMER(_scanner_wait_batch_timer); + // scanner batch wait time while (!(!_blocks_queue.empty() || done() || !status().ok() || state->is_cancelled())) { if (!is_scheduled && _num_running_scanners == 0 && should_be_scheduled()) { LOG(INFO) << debug_string(); diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index 28aec83d6a2..59e4c45a52a 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -89,7 +89,7 @@ public: // Set eos to true if there is no more data to read. // And if eos is true, the block returned must be nullptr. virtual Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* block, - bool* eos, int id, bool wait = true); + bool* eos, int id); [[nodiscard]] Status validate_block_schema(Block* block); @@ -134,7 +134,8 @@ public: // todo(wb) rethinking how to calculate ```_max_bytes_in_queue``` when executing shared scan inline bool should_be_scheduled() const { - return _cur_bytes_in_queue < _max_bytes_in_queue / 2; + return (_cur_bytes_in_queue < _max_bytes_in_queue / 2) && + (_serving_blocks_num < allowed_blocks_num()); } int get_available_thread_slot_num() { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org