This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit bedad15f0394970731e38a727f40369320c4ed42
Author: Yongqiang YANG <98214048+dataroar...@users.noreply.github.com>
AuthorDate: Sat Jan 27 00:06:19 2024 +0800

    [enhancement](scanner) add a lower bound for bytes in scanner queue (#29624)
---
 be/src/common/config.cpp                   |  1 +
 be/src/common/config.h                     |  1 +
 be/src/vec/exec/scan/scanner_context.cpp   | 10 ++++++++++
 be/src/vec/exec/scan/scanner_scheduler.cpp |  8 ++------
 4 files changed, 14 insertions(+), 6 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 58bb9156b5b..42ff7e8fce7 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -260,6 +260,7 @@ DEFINE_mInt32(doris_scanner_queue_size, "1024");
 DEFINE_mInt32(doris_scanner_row_num, "16384");
 // single read execute fragment row bytes
 DEFINE_mInt32(doris_scanner_row_bytes, "10485760");
+DEFINE_mInt32(min_bytes_in_scanner_queue, "67108864");
 // number of max scan keys
 DEFINE_mInt32(doris_max_scan_key_num, "48");
 // the max number of push down values of a single column.
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 8db84b459b8..1b75a1cbbd2 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -302,6 +302,7 @@ DECLARE_mInt32(doris_scanner_queue_size);
 DECLARE_mInt32(doris_scanner_row_num);
 // single read execute fragment row bytes
 DECLARE_mInt32(doris_scanner_row_bytes);
+DECLARE_mInt32(min_bytes_in_scanner_queue);
 // number of max scan keys
 DECLARE_mInt32(doris_max_scan_key_num);
 // the max number of push down values of a single column.
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index 0b19f389920..7691f159509 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -45,6 +45,9 @@ namespace doris::vectorized {
 
 using namespace std::chrono_literals;
 
+static bvar::Status<int64_t> 
g_bytes_in_scanner_queue("doris_bytes_in_scanner_queue", 0);
+static bvar::Status<int64_t> 
g_num_running_scanners("doris_num_running_scanners", 0);
+
 ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* 
output_tuple_desc,
                                const RowDescriptor* output_row_descriptor,
                                const 
std::list<std::shared_ptr<ScannerDelegate>>& scanners,
@@ -179,6 +182,9 @@ Status ScannerContext::init() {
     _free_blocks_capacity = _max_thread_num * _block_per_scanner;
     auto block = get_free_block();
     _estimated_block_bytes = std::max(block->allocated_bytes(), (size_t)16);
+    int min_blocks = (config::min_bytes_in_scanner_queue + 
_estimated_block_bytes - 1) /
+                     _estimated_block_bytes;
+    _free_blocks_capacity = std::max(_free_blocks_capacity, min_blocks);
     return_free_block(std::move(block));
 
 #ifndef BE_TEST
@@ -258,6 +264,7 @@ void 
ScannerContext::append_blocks_to_queue(std::vector<vectorized::BlockUPtr>&
     }
     _blocks_queue_added_cv.notify_one();
     _queued_blocks_memory_usage->add(_cur_bytes_in_queue - old_bytes_in_queue);
+    g_bytes_in_scanner_queue.set_value(_cur_bytes_in_queue);
 }
 
 bool ScannerContext::empty_in_queue(int id) {
@@ -334,6 +341,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* 
state, vectorized::Blo
         }
     }
 
+    g_bytes_in_scanner_queue.set_value(_cur_bytes_in_queue);
     if (!merge_blocks.empty()) {
         vectorized::MutableBlock m(block->get());
         for (auto& merge_block : merge_blocks) {
@@ -375,6 +383,7 @@ Status ScannerContext::validate_block_schema(Block* block) {
 void ScannerContext::inc_num_running_scanners(int32_t inc) {
     std::lock_guard l(_transfer_lock);
     _num_running_scanners += inc;
+    g_num_running_scanners.set_value(_num_running_scanners);
 }
 
 void ScannerContext::set_status_on_error(const Status& status, bool need_lock) 
{
@@ -484,6 +493,7 @@ void 
ScannerContext::push_back_scanner_and_reschedule(std::shared_ptr<ScannerDel
     // before we call the following if() block.
     {
         --_num_running_scanners;
+        g_num_running_scanners.set_value(_num_running_scanners);
         if (scanner->_scanner->need_to_close()) {
             --_num_unfinished_scanners;
             if (_num_unfinished_scanners == 0) {
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index e0fb5afa74a..feab4e9ba4c 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -328,8 +328,6 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* 
scheduler,
     // judge if we need to yield. So we record all raw data read in this round
     // scan, if this exceeds row number or bytes threshold, we yield this 
thread.
     std::vector<vectorized::BlockUPtr> blocks;
-    int64_t raw_rows_read = scanner->get_rows_read();
-    int64_t raw_rows_threshold = raw_rows_read + config::doris_scanner_row_num;
     int64_t raw_bytes_read = 0;
     int64_t raw_bytes_threshold = config::doris_scanner_row_bytes;
     int num_rows_in_block = 0;
@@ -343,11 +341,10 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* 
scheduler,
     // queue, it will affect query latency and query concurrency for example 
ssb 3.3.
     auto should_do_scan = [&, batch_size = state->batch_size(),
                            time = state->wait_full_block_schedule_times()]() {
-        if (raw_bytes_read < raw_bytes_threshold && raw_rows_read < 
raw_rows_threshold) {
+        if (raw_bytes_read < raw_bytes_threshold) {
             return true;
         } else if (num_rows_in_block < batch_size) {
-            return raw_bytes_read < raw_bytes_threshold * time &&
-                   raw_rows_read < raw_rows_threshold * time;
+            return raw_bytes_read < raw_bytes_threshold * time;
         }
         return false;
     };
@@ -396,7 +393,6 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* 
scheduler,
                 blocks.push_back(std::move(block));
             }
         }
-        raw_rows_read = scanner->get_rows_read();
     } // end for while
 
     // if we failed, check status.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to