This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e3458b79a1 [Try_Fix](scan) try fix the scanner schedule logic to 
prevent excessive memory usage and timeout (#30515)
6e3458b79a1 is described below

commit 6e3458b79a1af6e0b18360fb12709f5d66b0e8b9
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Mon Jan 29 23:41:55 2024 +0800

    [Try_Fix](scan) try fix the scanner schedule logic to prevent excessive 
memory usage and timeout (#30515)
---
 be/src/vec/exec/scan/pip_scanner_context.h | 14 ++++++++++++--
 be/src/vec/exec/scan/scanner_context.cpp   |  6 +++---
 be/src/vec/exec/scan/scanner_context.h     |  5 +++--
 3 files changed, 18 insertions(+), 7 deletions(-)

diff --git a/be/src/vec/exec/scan/pip_scanner_context.h 
b/be/src/vec/exec/scan/pip_scanner_context.h
index c2490a5e0d0..484fe5b4003 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -40,7 +40,7 @@ public:
               _need_colocate_distribute(!_col_distribute_ids.empty()) {}
 
     Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* 
block, bool* eos,
-                                int id, bool wait = false) override {
+                                int id) override {
         {
             std::unique_lock l(_transfer_lock);
             if (state->is_cancelled()) {
@@ -97,6 +97,11 @@ public:
             (*block)->set_columns(std::move(m.mutable_columns()));
         }
 
+        // after return free blocks, should try to reschedule the scanner
+        if (should_be_scheduled()) {
+            this->reschedule_scanner_ctx();
+        }
+
         return Status::OK();
     }
 
@@ -284,7 +289,7 @@ public:
                                          limit_, max_bytes_in_blocks_queue, 1, 
local_state,
                                          dependency) {}
     Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* 
block, bool* eos,
-                                int id, bool wait = false) override {
+                                int id) override {
         if (_blocks_queue_buffered.empty()) {
             std::unique_lock l(_transfer_lock);
             if (state->is_cancelled()) {
@@ -349,6 +354,11 @@ public:
         }
         return_free_blocks();
 
+        // after return free blocks, should try to reschedule the scanner
+        if (should_be_scheduled()) {
+            this->reschedule_scanner_ctx();
+        }
+
         return Status::OK();
     }
 
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index 7691f159509..033709f950e 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -273,7 +273,7 @@ bool ScannerContext::empty_in_queue(int id) {
 }
 
 Status ScannerContext::get_block_from_queue(RuntimeState* state, 
vectorized::BlockUPtr* block,
-                                            bool* eos, int id, bool wait) {
+                                            bool* eos, int id) {
     std::vector<vectorized::BlockUPtr> merge_blocks;
     {
         std::unique_lock l(_transfer_lock);
@@ -295,9 +295,9 @@ Status ScannerContext::get_block_from_queue(RuntimeState* 
state, vectorized::Blo
         }
 
         // Wait for block from queue
-        if (wait) {
-            // scanner batch wait time
+        {
             SCOPED_TIMER(_scanner_wait_batch_timer);
+            // scanner batch wait time
             while (!(!_blocks_queue.empty() || done() || !status().ok() || 
state->is_cancelled())) {
                 if (!is_scheduled && _num_running_scanners == 0 && 
should_be_scheduled()) {
                     LOG(INFO) << debug_string();
diff --git a/be/src/vec/exec/scan/scanner_context.h 
b/be/src/vec/exec/scan/scanner_context.h
index 28aec83d6a2..59e4c45a52a 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -89,7 +89,7 @@ public:
     // Set eos to true if there is no more data to read.
     // And if eos is true, the block returned must be nullptr.
     virtual Status get_block_from_queue(RuntimeState* state, 
vectorized::BlockUPtr* block,
-                                        bool* eos, int id, bool wait = true);
+                                        bool* eos, int id);
 
     [[nodiscard]] Status validate_block_schema(Block* block);
 
@@ -134,7 +134,8 @@ public:
 
     // todo(wb) rethinking how to calculate ```_max_bytes_in_queue``` when 
executing shared scan
     inline bool should_be_scheduled() const {
-        return _cur_bytes_in_queue < _max_bytes_in_queue / 2;
+        return (_cur_bytes_in_queue < _max_bytes_in_queue / 2) &&
+               (_serving_blocks_num < allowed_blocks_num());
     }
 
     int get_available_thread_slot_num() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to