This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 9a19581a2c5 [improvement](scanner_schedule) reduce memory consumption 
of scanner #24199 (#25547)
9a19581a2c5 is described below

commit 9a19581a2c55c6e78e7e2812a45e1bc3842ffec7
Author: Mingyu Chen <morning...@163.com>
AuthorDate: Fri Nov 3 19:10:30 2023 +0800

    [improvement](scanner_schedule) reduce memory consumption of scanner #24199 
(#25547)
---
 be/src/exec/exec_node.cpp                  |  3 +-
 be/src/pipeline/exec/scan_operator.cpp     |  2 +-
 be/src/runtime/plan_fragment_executor.cpp  |  1 +
 be/src/vec/exec/scan/pip_scanner_context.h |  7 +--
 be/src/vec/exec/scan/scanner_context.cpp   | 68 +++++++++++++++++-------------
 be/src/vec/exec/scan/scanner_context.h     | 28 +++++++-----
 be/src/vec/exec/scan/scanner_scheduler.cpp | 12 ++----
 7 files changed, 66 insertions(+), 55 deletions(-)

diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 0dc8df911b2..c9e327ad640 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -41,6 +41,7 @@
 #include "runtime/runtime_state.h"
 #include "util/debug_util.h"
 #include "util/runtime_profile.h"
+#include "util/stack_util.h"
 #include "util/uid_util.h"
 #include "vec/columns/column_nullable.h"
 #include "vec/core/block.h"
@@ -205,7 +206,7 @@ Status ExecNode::close(RuntimeState* state) {
                   << " already closed";
         return Status::OK();
     }
-    LOG(INFO) << "fragment_instance_id=" << 
print_id(state->fragment_instance_id()) << " closed";
+    LOG(INFO) << "fragment_instance_id=" << 
print_id(state->fragment_instance_id()) << " closed. ";
     _is_closed = true;
 
     Status result;
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index f34461a9fd2..1f15b1d61f8 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -44,7 +44,7 @@ bool ScanOperator::can_read() {
             return true;
         } else {
             if (_node->_scanner_ctx->get_num_running_scanners() == 0 &&
-                _node->_scanner_ctx->has_enough_space_in_blocks_queue()) {
+                _node->_scanner_ctx->should_be_scheduled()) {
                 _node->_scanner_ctx->reschedule_scanner_ctx();
             }
             return _node->ready_to_read(); // there are some blocks to process
diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index df5f4b7d3e4..ff2c0b8688a 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -53,6 +53,7 @@
 #include "util/container_util.hpp"
 #include "util/defer_op.h"
 #include "util/pretty_printer.h"
+#include "util/stack_util.h"
 #include "util/telemetry/telemetry.h"
 #include "util/threadpool.h"
 #include "util/time.h"
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h 
b/be/src/vec/exec/scan/pip_scanner_context.h
index b98c628368e..f52bd3bf3c7 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -166,10 +166,6 @@ public:
         _free_blocks_memory_usage->add(free_blocks_memory_usage);
     }
 
-    bool has_enough_space_in_blocks_queue() const override {
-        return _current_used_bytes < _max_bytes_in_queue / 2 * 
_num_parallel_instances;
-    }
-
     void _dispose_coloate_blocks_not_in_queue() override {
         if (_need_colocate_distribute) {
             for (int i = 0; i < _num_parallel_instances; ++i) {
@@ -221,8 +217,7 @@ private:
                     std::lock_guard<std::mutex> queue_l(*_queue_mutexs[loc]);
                     
_blocks_queues[loc].emplace_back(std::move(_colocate_blocks[loc]));
                 }
-                bool get_block_not_empty = true;
-                _colocate_blocks[loc] = get_free_block(&get_block_not_empty, 
get_block_not_empty);
+                _colocate_blocks[loc] = get_free_block();
                 _colocate_mutable_blocks[loc]->set_muatable_columns(
                         _colocate_blocks[loc]->mutate_columns());
             }
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index 478d9fb4cb7..8deb2153478 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -52,7 +52,7 @@ ScannerContext::ScannerContext(doris::RuntimeState* state_, 
doris::vectorized::V
           _process_status(Status::OK()),
           _batch_size(state_->batch_size()),
           limit(limit_),
-          _max_bytes_in_queue(max_bytes_in_blocks_queue_),
+          _max_bytes_in_queue(max_bytes_in_blocks_queue_ * 
num_parallel_instances),
           _scanner_scheduler(state_->exec_env()->scanner_scheduler()),
           _scanners(scanners_),
           _num_parallel_instances(num_parallel_instances) {
@@ -63,26 +63,21 @@ ScannerContext::ScannerContext(doris::RuntimeState* state_, 
doris::vectorized::V
     if (limit < 0) {
         limit = -1;
     }
-}
 
-// After init function call, should not access _parent
-Status ScannerContext::init() {
-    // 1. Calculate max concurrency
-    // TODO: now the max thread num <= 
config::doris_scanner_thread_pool_thread_num / 4
-    // should find a more reasonable value.
     _max_thread_num = config::doris_scanner_thread_pool_thread_num / 4;
-    if (_parent->_shared_scan_opt) {
-        DCHECK(_num_parallel_instances > 0);
-        _max_thread_num *= _num_parallel_instances;
-    }
+    _max_thread_num *= num_parallel_instances;
     _max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num;
     DCHECK(_max_thread_num > 0);
     _max_thread_num = std::min(_max_thread_num, (int32_t)_scanners.size());
+    // 1. Calculate max concurrency
     // For select * from table limit 10; should just use one thread.
     if (_parent->should_run_serial()) {
         _max_thread_num = 1;
     }
+}
 
+// After init function call, should not access _parent
+Status ScannerContext::init() {
     _scanner_profile = _parent->_scanner_profile;
     _scanner_sched_counter = _parent->_scanner_sched_counter;
     _scanner_ctx_sched_counter = _parent->_scanner_ctx_sched_counter;
@@ -104,6 +99,9 @@ Status ScannerContext::init() {
             limit == -1 ? _batch_size : 
std::min(static_cast<int64_t>(_batch_size), limit);
     _block_per_scanner = (doris_scanner_row_num + (real_block_size - 1)) / 
real_block_size;
     _free_blocks_capacity = _max_thread_num * _block_per_scanner;
+    auto block = get_free_block();
+    _estimated_block_bytes = std::max(block->allocated_bytes(), (size_t)16);
+    return_free_block(std::move(block));
 
 #ifndef BE_TEST
     // 3. get thread token
@@ -123,27 +121,33 @@ Status ScannerContext::init() {
     return Status::OK();
 }
 
-vectorized::BlockUPtr ScannerContext::get_free_block(bool* has_free_block,
-                                                     bool get_block_not_empty) 
{
+vectorized::BlockUPtr ScannerContext::get_free_block() {
     vectorized::BlockUPtr block;
     if (_free_blocks.try_dequeue(block)) {
-        if (!get_block_not_empty || block->mem_reuse()) {
-            _free_blocks_capacity--;
-            _free_blocks_memory_usage->add(-block->allocated_bytes());
-            return block;
-        }
+        DCHECK(block->mem_reuse());
+        _free_blocks_memory_usage->add(-block->allocated_bytes());
+        _serving_blocks_num++;
+        return block;
     }
 
+    block = vectorized::Block::create_unique(_output_tuple_desc->slots(), 
_batch_size,
+                                             true /*ignore invalid slots*/);
     COUNTER_UPDATE(_newly_create_free_blocks_num, 1);
-    return vectorized::Block::create_unique(_output_tuple_desc->slots(), 
_batch_size,
-                                            true /*ignore invalid slots*/);
+
+    _serving_blocks_num++;
+    return block;
 }
 
 void ScannerContext::return_free_block(std::unique_ptr<vectorized::Block> 
block) {
-    block->clear_column_data();
-    _free_blocks_memory_usage->add(block->allocated_bytes());
-    _free_blocks.enqueue(std::move(block));
-    ++_free_blocks_capacity;
+    _serving_blocks_num--;
+    if (block->mem_reuse()) {
+        // Only put blocks with schema to free blocks, because colocate blocks
+        // need schema.
+        _estimated_block_bytes = std::max(block->allocated_bytes(), 
(size_t)16);
+        block->clear_column_data();
+        _free_blocks_memory_usage->add(block->allocated_bytes());
+        _free_blocks.enqueue(std::move(block));
+    }
 }
 
 void 
ScannerContext::append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& 
blocks) {
@@ -176,7 +180,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* 
state, vectorized::Blo
     // (if the scheduler continues to schedule, it will cause a lot of busy 
running).
     // At this point, consumers are required to trigger new scheduling to 
ensure that
     // data can be continuously fetched.
-    if (has_enough_space_in_blocks_queue() && _num_running_scanners == 0) {
+    if (should_be_scheduled() && _num_running_scanners == 0) {
         auto state = _scanner_scheduler->submit(this);
         if (state.ok()) {
             _num_scheduling_ctx++;
@@ -184,6 +188,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* 
state, vectorized::Blo
             set_status_on_error(state, false);
         }
     }
+
     // Wait for block from queue
     if (wait) {
         SCOPED_TIMER(_scanner_wait_batch_timer);
@@ -207,6 +212,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* 
state, vectorized::Blo
 
         auto block_bytes = (*block)->allocated_bytes();
         _cur_bytes_in_queue -= block_bytes;
+
         _queued_blocks_memory_usage->add(-block_bytes);
         return Status::OK();
     } else {
@@ -353,7 +359,13 @@ void 
ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) {
         _scanners.push_front(scanner);
     }
     std::lock_guard l(_transfer_lock);
-    if (has_enough_space_in_blocks_queue()) {
+
+    // In pipeline engine, doris will close scanners when `no_schedule`.
+    // We have to decrease _num_running_scanners before schedule, otherwise
+    // schedule does not woring due to _num_running_scanners.
+    _num_running_scanners--;
+
+    if (should_be_scheduled()) {
         auto state = _scanner_scheduler->submit(this);
         if (state.ok()) {
             _num_scheduling_ctx++;
@@ -373,8 +385,6 @@ void 
ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) {
         _is_finished = true;
         _blocks_queue_added_cv.notify_one();
     }
-    // In pipeline engine, doris will close scanners when `no_schedule`.
-    _num_running_scanners--;
     _ctx_finish_cv.notify_one();
 }
 
@@ -384,7 +394,7 @@ void 
ScannerContext::get_next_batch_of_scanners(std::list<VScannerSPtr>* current
     {
         // If there are enough space in blocks queue,
         // the scanner number depends on the _free_blocks numbers
-        thread_slot_num = cal_thread_slot_num_by_free_block_num();
+        thread_slot_num = get_available_thread_slot_num();
     }
 
     // 2. get #thread_slot_num scanners from ctx->scanners
diff --git a/be/src/vec/exec/scan/scanner_context.h 
b/be/src/vec/exec/scan/scanner_context.h
index 3aad0d6263f..a345bfc03dd 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -62,12 +62,12 @@ public:
     ScannerContext(RuntimeState* state_, VScanNode* parent,
                    const TupleDescriptor* output_tuple_desc,
                    const std::list<VScannerSPtr>& scanners_, int64_t limit_,
-                   int64_t max_bytes_in_blocks_queue_, const int 
num_parallel_instances = 0);
+                   int64_t max_bytes_in_blocks_queue_, const int 
num_parallel_instances = 1);
 
     virtual ~ScannerContext() = default;
     virtual Status init();
 
-    vectorized::BlockUPtr get_free_block(bool* has_free_block, bool 
get_not_empty_block = false);
+    vectorized::BlockUPtr get_free_block();
     void return_free_block(std::unique_ptr<vectorized::Block> block);
 
     // Append blocks from scanners to the blocks queue.
@@ -136,20 +136,25 @@ public:
     virtual bool empty_in_queue(int id);
 
     // todo(wb) rethinking how to calculate ```_max_bytes_in_queue``` when 
executing shared scan
-    virtual inline bool has_enough_space_in_blocks_queue() const {
-        return _cur_bytes_in_queue < _max_bytes_in_queue / 2;
+    inline bool should_be_scheduled() const {
+        return (_cur_bytes_in_queue < _max_bytes_in_queue / 2) &&
+               (_serving_blocks_num < allowed_blocks_num());
     }
 
-    int cal_thread_slot_num_by_free_block_num() {
+    int get_available_thread_slot_num() {
         int thread_slot_num = 0;
-        thread_slot_num = (_free_blocks_capacity + _block_per_scanner - 1) / 
_block_per_scanner;
+        thread_slot_num = (allowed_blocks_num() + _block_per_scanner - 1) / 
_block_per_scanner;
         thread_slot_num = std::min(thread_slot_num, _max_thread_num - 
_num_running_scanners);
-        if (thread_slot_num <= 0) {
-            thread_slot_num = 1;
-        }
         return thread_slot_num;
     }
 
+    int32_t allowed_blocks_num() const {
+        int32_t blocks_num = std::min(_free_blocks_capacity,
+                                      int32_t((_max_bytes_in_queue + 
_estimated_block_bytes - 1) /
+                                              _estimated_block_bytes));
+        return blocks_num;
+    }
+
     void reschedule_scanner_ctx();
 
     // the unique id of this context
@@ -203,10 +208,12 @@ protected:
 
     // Lazy-allocated blocks for all scanners to share, for memory reuse.
     moodycamel::ConcurrentQueue<vectorized::BlockUPtr> _free_blocks;
+    std::atomic<int32_t> _serving_blocks_num = 0;
     // The current number of free blocks available to the scanners.
     // Used to limit the memory usage of the scanner.
     // NOTE: this is NOT the size of `_free_blocks`.
-    std::atomic_int32_t _free_blocks_capacity = 0;
+    int32_t _free_blocks_capacity = 0;
+    int64_t _estimated_block_bytes = 0;
 
     int _batch_size;
     // The limit from SQL's limit clause
@@ -231,6 +238,7 @@ protected:
     int64_t _cur_bytes_in_queue = 0;
     // The max limit bytes of blocks in blocks queue
     const int64_t _max_bytes_in_queue;
+    std::atomic<int64_t> _bytes_allocated = 0;
 
     doris::vectorized::ScannerScheduler* _scanner_scheduler;
     // List "scanners" saves all "unfinished" scanners.
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 3481128a1d2..2529ce67e5e 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -321,7 +321,6 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* 
scheduler, ScannerContext
     int64_t raw_rows_threshold = raw_rows_read + config::doris_scanner_row_num;
     int64_t raw_bytes_read = 0;
     int64_t raw_bytes_threshold = config::doris_scanner_row_bytes;
-    bool has_free_block = true;
     int num_rows_in_block = 0;
 
     // Only set to true when ctx->done() return true.
@@ -331,9 +330,8 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* 
scheduler, ScannerContext
     bool should_stop = false;
     // Has to wait at least one full block, or it will cause a lot of schedule 
task in priority
     // queue, it will affect query latency and query concurrency for example 
ssb 3.3.
-    while (!eos && raw_bytes_read < raw_bytes_threshold &&
-           ((raw_rows_read < raw_rows_threshold && has_free_block) ||
-            num_rows_in_block < state->batch_size())) {
+    while (!eos && raw_bytes_read < raw_bytes_threshold && raw_rows_read < 
raw_rows_threshold &&
+           num_rows_in_block < state->batch_size()) {
         if (UNLIKELY(ctx->done())) {
             // No need to set status on error here.
             // Because done() maybe caused by "should_stop"
@@ -341,7 +339,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* 
scheduler, ScannerContext
             break;
         }
 
-        BlockUPtr block = ctx->get_free_block(&has_free_block);
+        BlockUPtr block = ctx->get_free_block();
         status = scanner->get_block(state, block.get(), &eos);
         VLOG_ROW << "VScanNode input rows: " << block->rows() << ", eos: " << 
eos;
         // The VFileScanner for external table may try to open not exist files,
@@ -357,12 +355,11 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* 
scheduler, ScannerContext
         if (status.is<ErrorCode::NOT_FOUND>()) {
             // The only case in this "if" branch is external table file delete 
and fe cache has not been updated yet.
             // Set status to OK.
-            LOG(INFO) << "scan range not found: " << 
scanner->get_current_scan_range_name();
             status = Status::OK();
             eos = true;
         }
 
-        raw_bytes_read += block->bytes();
+        raw_bytes_read += block->allocated_bytes();
         num_rows_in_block += block->rows();
         if (UNLIKELY(block->rows() == 0)) {
             ctx->return_free_block(std::move(block));
@@ -397,7 +394,6 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* 
scheduler, ScannerContext
     if (eos || should_stop) {
         scanner->mark_to_need_to_close();
     }
-
     ctx->push_back_scanner_and_reschedule(scanner);
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to