This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a912f77bd79 [fix](scanner) Fix incorrect _max_thread_num in scanner 
context (#40569)
a912f77bd79 is described below

commit a912f77bd79a5c7a7b4cbcfa91f035d3453d9a21
Author: zhiqiang <seuhezhiqi...@163.com>
AuthorDate: Thu Sep 12 10:37:14 2024 +0800

    [fix](scanner) Fix incorrect _max_thread_num in scanner context (#40569)
    
    1. Minor refactor for scanner constructor, calculation of
    _max_thread_num is moved to init method
    2. The expected value of _max_thread_num is changed. There is no need to
    submit too many scan task to scan scheduler, since thread num is
    limited.
    3. Calculation of _max_bytes_in_queue is changed. _max_bytes_in_queue
    for each scan instance is limited to 100MB by default.
---
 be/src/pipeline/exec/scan_operator.cpp             |  11 +-
 be/src/vec/exec/scan/scanner_context.cpp           | 120 ++++++++++++---------
 be/src/vec/exec/scan/scanner_context.h             |   9 +-
 .../java/org/apache/doris/qe/SessionVariable.java  |   5 +-
 4 files changed, 79 insertions(+), 66 deletions(-)

diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index 0c0cfb18c77..eb30d62495d 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -996,16 +996,7 @@ Status ScanLocalState<Derived>::_start_scanners(
     auto& p = _parent->cast<typename Derived::Parent>();
     _scanner_ctx = vectorized::ScannerContext::create_shared(
             state(), this, p._output_tuple_desc, p.output_row_descriptor(), 
scanners, p.limit(),
-            state()->scan_queue_mem_limit(), _scan_dependency,
-            // NOTE: This will logic makes _max_thread_num of ScannerContext 
to be C(num of cores) * 2
-            // For a query with C/2 instance and M scan node, scan task of 
this query will be C/2 * M * C*2
-            // and will be C*C*N at most.
-            // 1. If data distribution is ignored , we use 1 instance to scan.
-            // 2. Else if this operator is not file scan operator, we use 
config::doris_scanner_thread_pool_thread_num scanners to scan.
-            // 3. Else, file scanner will consume much memory so we use 
config::doris_scanner_thread_pool_thread_num / query_parallel_instance_num 
scanners to scan.
-            p.ignore_data_distribution() || !p.is_file_scan_operator()
-                    ? 1
-                    : state()->query_parallel_instance_num());
+            _scan_dependency, p.ignore_data_distribution());
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index cbb3d0f5723..b5cb47fda1b 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -42,8 +42,7 @@ ScannerContext::ScannerContext(
         RuntimeState* state, pipeline::ScanLocalStateBase* local_state,
         const TupleDescriptor* output_tuple_desc, const RowDescriptor* 
output_row_descriptor,
         const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& 
scanners, int64_t limit_,
-        int64_t max_bytes_in_blocks_queue, 
std::shared_ptr<pipeline::Dependency> dependency,
-        const int num_parallel_instances)
+        std::shared_ptr<pipeline::Dependency> dependency, bool 
ignore_data_distribution)
         : HasTaskExecutionCtx(state),
           _state(state),
           _local_state(local_state),
@@ -53,53 +52,102 @@ ScannerContext::ScannerContext(
           _output_row_descriptor(output_row_descriptor),
           _batch_size(state->batch_size()),
           limit(limit_),
-          _max_bytes_in_queue(std::max(max_bytes_in_blocks_queue, 
(int64_t)1024) *
-                              num_parallel_instances),
           _scanner_scheduler(state->exec_env()->scanner_scheduler()),
           _all_scanners(scanners.begin(), scanners.end()),
-          _num_parallel_instances(num_parallel_instances) {
+          _ignore_data_distribution(ignore_data_distribution) {
     DCHECK(_output_row_descriptor == nullptr ||
            _output_row_descriptor->tuple_descriptors().size() == 1);
     _query_id = _state->get_query_ctx()->query_id();
     ctx_id = UniqueId::gen_uid().to_string();
-    // Provide more memory for wide tables, increase proportionally by 
multiples of 300
-    _max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1;
-    if (scanners.empty()) {
-        _is_finished = true;
-        _set_scanner_done();
-    }
     _scanners.enqueue_bulk(scanners.begin(), scanners.size());
     if (limit < 0) {
         limit = -1;
     }
     MAX_SCALE_UP_RATIO = _state->scanner_scale_up_ratio();
+    _query_thread_context = {_query_id, _state->query_mem_tracker(),
+                             _state->get_query_ctx()->workload_group()};
+    _dependency = dependency;
+}
+
+// After init function call, should not access _parent
+Status ScannerContext::init() {
+    _scanner_profile = _local_state->_scanner_profile;
+    _scanner_sched_counter = _local_state->_scanner_sched_counter;
+    _newly_create_free_blocks_num = 
_local_state->_newly_create_free_blocks_num;
+    _scanner_wait_batch_timer = _local_state->_scanner_wait_batch_timer;
+    _scanner_ctx_sched_time = _local_state->_scanner_ctx_sched_time;
+    _scale_up_scanners_counter = _local_state->_scale_up_scanners_counter;
+
+#ifndef BE_TEST
+    // 3. get thread token
+    if (_state->get_query_ctx()) {
+        thread_token = _state->get_query_ctx()->get_token();
+        _simple_scan_scheduler = _state->get_query_ctx()->get_scan_scheduler();
+        if (_simple_scan_scheduler) {
+            _should_reset_thread_name = false;
+        }
+        _remote_scan_task_scheduler = 
_state->get_query_ctx()->get_remote_scan_scheduler();
+    }
+#endif
+    _local_state->_runtime_profile->add_info_string("UseSpecificThreadToken",
+                                                    thread_token == nullptr ? 
"False" : "True");
+
+    int num_parallel_instances = _state->query_parallel_instance_num();
+
+    // NOTE: When ignore_data_distribution is true, the parallelism
+    // of the scan operator is regarded as 1 (actually maybe not).
+    // That will make the number of scan task can be submitted to the scheduler
+    // in a vary large value. This logicl is kept from the older 
implementation.
+    // https://github.com/apache/doris/pull/28266
+    if (_ignore_data_distribution) {
+        num_parallel_instances = 1;
+    }
+
+    // _max_bytes_in_queue controls the maximum memory that can be used by a 
single scan instance.
+    // scan_queue_mem_limit on FE is 100MB by default, on backend we will make 
sure its actual value
+    // is larger than 10MB.
+    _max_bytes_in_queue = std::max(_state->scan_queue_mem_limit(), 
(int64_t)1024 * 1024 * 10);
+
+    // Provide more memory for wide tables, increase proportionally by 
multiples of 300
+    _max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1;
+
+    // TODO: Where is the proper position to place this code?
+    if (_all_scanners.empty()) {
+        _is_finished = true;
+        _set_scanner_done();
+    }
+
     // _max_thread_num controls how many scanners of this ScanOperator can be 
submitted to scheduler at a time.
     // The overall target of our system is to make full utilization of the 
resources.
-    // At the same time, we dont want too many tasks are queued by scheduler, 
that makes the query
-    // waiting too long, and existing task can not be scheduled in time.
-    // First of all, we try to make sure _max_thread_num of a ScanNode of a 
query on a single backend is less than
-    // config::doris_scanner_thread_pool_thread_num.
+    // At the same time, we dont want too many tasks are queued by scheduler, 
that is not necessary.
+    // So, first of all, we try to make sure _max_thread_num of a ScanNode of 
a query on a single backend is less than
+    // 2 * config::doris_scanner_thread_pool_thread_num, so that we can make 
all io threads busy.
     // For example, on a 64-core machine, the default value of 
config::doris_scanner_thread_pool_thread_num will be 64*2 =128.
     // and the num_parallel_instances of this scan operator will be 64/2=32.
-    // For a query who has two scan nodes, the _max_thread_num of each scan 
node instance will be 128 / 32 = 4.
-    // We have 32 instances of this scan operator, so for the ScanNode, we 
have 4 * 32 = 128 scanner tasks can be submitted at a time.
-    // Remember that we have to ScanNode in this query, so the total number of 
scanner tasks can be submitted at a time is 128 * 2 = 256.
+    // For a query who has one scan nodes, the _max_thread_num of each scan 
node instance will be 2 * 128 / 32 = 8.
+    // We have 32 instances of this scan operator, so for the ScanNode, we 
have 8 * 32 = 256 scanner tasks can be submitted at a time.
+    // The thread pool of scanner is 128, that means we will have 128 tasks 
running in parallel and another 128 tasks are waiting in the queue.
+    // When first 128 tasks are finished, the next 128 tasks will be 
extricated from the queue and be executed,
+    // and another 128 tasks will be submitted to the queue if there are 
remaining.
     _max_thread_num =
             _state->num_scanner_threads() > 0
                     ? _state->num_scanner_threads()
-                    : config::doris_scanner_thread_pool_thread_num / 
num_parallel_instances;
+                    : 2 * (config::doris_scanner_thread_pool_thread_num / 
num_parallel_instances);
     _max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num;
     // In some situation, there are not too many big tablets involed, so we 
can reduce the thread number.
-    _max_thread_num = std::min(_max_thread_num, (int32_t)scanners.size());
+    // NOTE: when _all_scanners.size is zero, the _max_thread_num will be 0.
+    _max_thread_num = std::min(_max_thread_num, (int32_t)_all_scanners.size());
+
     // 1. Calculate max concurrency
     // For select * from table limit 10; should just use one thread.
     if (_local_state->should_run_serial()) {
         _max_thread_num = 1;
     }
+
     // when user not specify scan_thread_num, so we can try downgrade 
_max_thread_num.
     // becaue we found in a table with 5k columns, column reader may ocuppy 
too much memory.
     // you can refer https://github.com/apache/doris/issues/35340 for details.
-    int32_t max_column_reader_num = 
state->query_options().max_column_reader_num;
+    int32_t max_column_reader_num = 
_state->query_options().max_column_reader_num;
     if (_max_thread_num != 1 && max_column_reader_num > 0) {
         int32_t scan_column_num = _output_tuple_desc->slots().size();
         int32_t current_column_num = scan_column_num * _max_thread_num;
@@ -109,7 +157,7 @@ ScannerContext::ScannerContext(
             if (new_max_thread_num < _max_thread_num) {
                 int32_t origin_max_thread_num = _max_thread_num;
                 _max_thread_num = new_max_thread_num;
-                LOG(INFO) << "downgrade query:" << print_id(state->query_id())
+                LOG(INFO) << "downgrade query:" << print_id(_state->query_id())
                           << " scan's max_thread_num from " << 
origin_max_thread_num << " to "
                           << _max_thread_num << ",column num: " << 
scan_column_num
                           << ", max_column_reader_num: " << 
max_column_reader_num;
@@ -117,35 +165,7 @@ ScannerContext::ScannerContext(
         }
     }
 
-    _query_thread_context = {_query_id, _state->query_mem_tracker(),
-                             _state->get_query_ctx()->workload_group()};
-    _dependency = dependency;
-}
-
-// After init function call, should not access _parent
-Status ScannerContext::init() {
-    _scanner_profile = _local_state->_scanner_profile;
-    _scanner_sched_counter = _local_state->_scanner_sched_counter;
-    _newly_create_free_blocks_num = 
_local_state->_newly_create_free_blocks_num;
-    _scanner_wait_batch_timer = _local_state->_scanner_wait_batch_timer;
-    _scanner_ctx_sched_time = _local_state->_scanner_ctx_sched_time;
-    _scale_up_scanners_counter = _local_state->_scale_up_scanners_counter;
-
-#ifndef BE_TEST
-    // 3. get thread token
-    if (_state->get_query_ctx()) {
-        thread_token = _state->get_query_ctx()->get_token();
-        _simple_scan_scheduler = _state->get_query_ctx()->get_scan_scheduler();
-        if (_simple_scan_scheduler) {
-            _should_reset_thread_name = false;
-        }
-        _remote_scan_task_scheduler = 
_state->get_query_ctx()->get_remote_scan_scheduler();
-    }
-#endif
-
     COUNTER_SET(_local_state->_max_scanner_thread_num, 
(int64_t)_max_thread_num);
-    _local_state->_runtime_profile->add_info_string("UseSpecificThreadToken",
-                                                    thread_token == nullptr ? 
"False" : "True");
 
     // submit `_max_thread_num` running scanners to `ScannerScheduler`
     // When a running scanners is finished, it will submit one of the 
remaining scanners.
diff --git a/be/src/vec/exec/scan/scanner_context.h 
b/be/src/vec/exec/scan/scanner_context.h
index 03c4e5a4f1b..36eb20c220d 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -105,9 +105,8 @@ public:
                    const TupleDescriptor* output_tuple_desc,
                    const RowDescriptor* output_row_descriptor,
                    const 
std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners,
-                   int64_t limit_, int64_t max_bytes_in_blocks_queue,
-                   std::shared_ptr<pipeline::Dependency> dependency,
-                   const int num_parallel_instances);
+                   int64_t limit_, std::shared_ptr<pipeline::Dependency> 
dependency,
+                   bool ignore_data_distribution);
 
     ~ScannerContext() override {
         
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker);
@@ -210,7 +209,7 @@ protected:
     int64_t limit;
 
     int32_t _max_thread_num = 0;
-    int64_t _max_bytes_in_queue;
+    int64_t _max_bytes_in_queue = 0;
     doris::vectorized::ScannerScheduler* _scanner_scheduler;
     SimplifiedScanScheduler* _simple_scan_scheduler = nullptr;
     SimplifiedScanScheduler* _remote_scan_task_scheduler = nullptr;
@@ -220,7 +219,6 @@ protected:
     int32_t _num_running_scanners = 0;
     // weak pointer for _scanners, used in stop function
     std::vector<std::weak_ptr<ScannerDelegate>> _all_scanners;
-    const int _num_parallel_instances;
     std::shared_ptr<RuntimeProfile> _scanner_profile;
     RuntimeProfile::Counter* _scanner_sched_counter = nullptr;
     RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr;
@@ -229,6 +227,7 @@ protected:
     RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr;
     QueryThreadContext _query_thread_context;
     std::shared_ptr<pipeline::Dependency> _dependency = nullptr;
+    bool _ignore_data_distribution = false;
 
     // for scaling up the running scanners
     size_t _estimated_block_size = 0;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index fef1cc5bb29..d42e2e828ae 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -693,7 +693,10 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = EXEC_MEM_LIMIT)
     public long maxExecMemByte = 2147483648L;
 
-    @VariableMgr.VarAttr(name = SCAN_QUEUE_MEM_LIMIT)
+    @VariableMgr.VarAttr(name = SCAN_QUEUE_MEM_LIMIT,
+            description = {"每个 Scan Instance 的 block queue 能够保存多少字节的 block",
+                    "How many bytes of block can be saved in the block queue 
of each Scan Instance"})
+    // 100MB
     public long maxScanQueueMemByte = 2147483648L / 20;
 
     @VariableMgr.VarAttr(name = NUM_SCANNER_THREADS, needForward = true, 
description = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to