github-actions[bot] commented on code in PR #30746:
URL: https://github.com/apache/doris/pull/30746#discussion_r1480947982


##########
be/src/vec/exec/scan/scanner_context.h:
##########
@@ -81,88 +122,49 @@
     virtual Status init();
 
     vectorized::BlockUPtr get_free_block();
-    void return_free_block(std::unique_ptr<vectorized::Block> block);
+    void return_free_block(vectorized::BlockUPtr block);
 
-    // Append blocks from scanners to the blocks queue.
-    virtual void append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& 
blocks);
-    // Get next block from blocks queue. Called by ScanNode
+    // Get next block from blocks queue. Called by ScanNode/ScanOperator
     // Set eos to true if there is no more data to read.
-    // And if eos is true, the block returned must be nullptr.
-    virtual Status get_block_from_queue(RuntimeState* state, 
vectorized::BlockUPtr* block,
-                                        bool* eos, int id);
+    virtual Status get_block_from_queue(RuntimeState* state, 
vectorized::Block* block, bool* eos,
+                                        int id, bool wait = true);
 
     [[nodiscard]] Status validate_block_schema(Block* block);
 
-    // When a scanner complete a scan, this method will be called
-    // to return the scanner to the list for next scheduling.
-    void push_back_scanner_and_reschedule(std::shared_ptr<ScannerDelegate> 
scanner);
+    // submit the running scanner to thread pool in `ScannerScheduler`
+    // set the next scanned block to `ScanTask::current_block`
+    // set the error state to `ScanTask::status`
+    // set the `eos` to `ScanTask::eos` if there is no more data in current 
scanner
+    void submit_scan_task(std::shared_ptr<ScanTask> scan_task);

Review Comment:
   warning: parameter 'num_parallel_instances' is const-qualified in the 
function declaration; const-qualification of parameters only has an effect in 
function definitions [readability-avoid-const-params-in-decls]
   
   ```suggestion
                      int64_t max_bytes_in_blocks_queue_, int 
num_parallel_instances,
   ```
   



##########
be/src/vec/exec/scan/scanner_context.h:
##########
@@ -58,6 +58,47 @@
 class ScannerScheduler;
 class SimplifiedScanScheduler;
 
+class ScanTask {
+public:
+    ScanTask(std::weak_ptr<ScannerDelegate> delegate_scanner, 
vectorized::BlockUPtr free_block)
+            : scanner(delegate_scanner), current_block(std::move(free_block)) 
{}
+
+private:
+    // whether current scanner is finished
+    bool eos = false;
+    Status status = Status::OK();
+
+public:
+    std::weak_ptr<ScannerDelegate> scanner;
+    // cache the block of current loop
+    vectorized::BlockUPtr current_block;
+    // only take the size of the first block as estimated size
+    bool first_block = true;
+    uint64_t last_submit_time; // nanoseconds
+
+    void set_status(Status _status) {
+        if (_status.is<ErrorCode::END_OF_FILE>()) {
+            // set `eos` if `END_OF_FILE`, don't take `END_OF_FILE` as error

Review Comment:
   warning: prefer using 'override' or (rarely) 'final' instead of 'virtual' 
[modernize-use-override]
   
   ```suggestion
       ~ScannerContext() override = default;
   ```
   



##########
be/src/vec/exec/scan/scanner_context.h:
##########
@@ -58,6 +58,47 @@ class VScanNode;
 class ScannerScheduler;
 class SimplifiedScanScheduler;
 
+class ScanTask {
+public:
+    ScanTask(std::weak_ptr<ScannerDelegate> delegate_scanner, 
vectorized::BlockUPtr free_block)
+            : scanner(delegate_scanner), current_block(std::move(free_block)) 
{}
+
+private:
+    // whether current scanner is finished
+    bool eos = false;
+    Status status = Status::OK();
+
+public:
+    std::weak_ptr<ScannerDelegate> scanner;
+    // cache the block of current loop
+    vectorized::BlockUPtr current_block;
+    // only take the size of the first block as estimated size
+    bool first_block = true;
+    uint64_t last_submit_time; // nanoseconds
+

Review Comment:
   warning: parameter 'num_parallel_instances' is const-qualified in the 
function declaration; const-qualification of parameters only has an effect in 
function definitions [readability-avoid-const-params-in-decls]
   
   ```suggestion
                      int64_t max_bytes_in_blocks_queue, int 
num_parallel_instances = 1,
   ```
   



##########
be/src/vec/exec/scan/scanner_context.cpp:
##########
@@ -220,138 +162,221 @@ std::string ScannerContext::parent_name() {
 vectorized::BlockUPtr ScannerContext::get_free_block() {
     vectorized::BlockUPtr block;
     if (_free_blocks.try_dequeue(block)) {
+        std::lock_guard<std::mutex> fl(_free_blocks_lock);
         DCHECK(block->mem_reuse());
-        _free_blocks_memory_usage->add(-block->allocated_bytes());
-        _serving_blocks_num++;
+        _free_blocks_memory_usage -= block->allocated_bytes();
+        _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
         return block;
     }
 
-    block = vectorized::Block::create_unique(_output_tuple_desc->slots(), 
_batch_size,
-                                             true /*ignore invalid slots*/);
-
-    COUNTER_UPDATE(_newly_create_free_blocks_num, 1);
-
-    _serving_blocks_num++;
-    return block;
+    _newly_create_free_blocks_num->update(1);
+    return vectorized::Block::create_unique(_output_tuple_desc->slots(), 
_batch_size,
+                                            true /*ignore invalid slots*/);
 }
 
-void ScannerContext::return_free_block(std::unique_ptr<vectorized::Block> 
block) {
-    _serving_blocks_num--;
-    if (block->mem_reuse()) {
-        // Only put blocks with schema to free blocks, because colocate blocks
-        // need schema.
-        _estimated_block_bytes = std::max(block->allocated_bytes(), 
(size_t)16);
+void ScannerContext::return_free_block(vectorized::BlockUPtr block) {
+    std::lock_guard<std::mutex> fl(_free_blocks_lock);
+    if (block->mem_reuse() && _free_blocks_memory_usage < _max_bytes_in_queue) 
{
         block->clear_column_data();
-        _free_blocks_memory_usage->add(block->allocated_bytes());
+        _free_blocks_memory_usage += block->allocated_bytes();
+        _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
         _free_blocks.enqueue(std::move(block));
     }
 }
 
-void 
ScannerContext::append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& 
blocks) {
-    std::lock_guard l(_transfer_lock);
-    auto old_bytes_in_queue = _cur_bytes_in_queue;
-    for (auto& b : blocks) {
-        auto st = validate_block_schema(b.get());
+bool ScannerContext::empty_in_queue(int id) {
+    std::lock_guard<std::mutex> l(_transfer_lock);
+    return _blocks_queue.empty();
+}
+
+void ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task) {
+    _scanner_sched_counter->update(1);
+    _num_scheduled_scanners++;
+    _scanner_scheduler->submit(shared_from_this(), scan_task);
+}
+
+void ScannerContext::append_block_to_queue(std::shared_ptr<ScanTask> 
scan_task) {
+    if (scan_task->status_ok() && scan_task->current_block->rows() > 0) {
+        Status st = validate_block_schema(scan_task->current_block.get());
         if (!st.ok()) {
-            set_status_on_error(st, false);
+            scan_task->set_status(st);
         }
-        _cur_bytes_in_queue += b->allocated_bytes();
-        _blocks_queue.push_back(std::move(b));
     }
-    blocks.clear();
-    if (_dependency) {
-        _dependency->set_ready();
+    std::lock_guard<std::mutex> l(_transfer_lock);
+    if (!scan_task->status_ok()) {
+        _process_status = scan_task->get_status();
+    }
+    if (_last_scale_up_time == 0) {
+        _last_scale_up_time = UnixMillis();
+    }
+    if (_blocks_queue.empty() && _last_fetch_time != 0) {
+        // there's no block in queue before current block, so the consumer is 
waiting
+        _total_wait_block_time += UnixMillis() - _last_fetch_time;
     }
+    _num_scheduled_scanners--;
+    _blocks_queue.emplace_back(scan_task);
     _blocks_queue_added_cv.notify_one();
-    _queued_blocks_memory_usage->add(_cur_bytes_in_queue - old_bytes_in_queue);
-    g_bytes_in_scanner_queue.set_value(_cur_bytes_in_queue);
 }
 
-bool ScannerContext::empty_in_queue(int id) {
+Status ScannerContext::get_block_from_queue(RuntimeState* state, 
vectorized::Block* block,
+                                            bool* eos, int id, bool wait) {
+    if (state->is_cancelled()) {
+        _set_scanner_done();
+        return Status::Cancelled("Query cancelled in ScannerContext");
+    }
     std::unique_lock l(_transfer_lock);
-    return _blocks_queue.empty();
-}
+    // Wait for block from queue
+    if (wait) {
+        // scanner batch wait time
+        SCOPED_TIMER(_scanner_wait_batch_timer);
+        while (!done() && _blocks_queue.empty() && _process_status.ok()) {
+            _blocks_queue_added_cv.wait_for(l, 1s);
+        }
+    }
+    if (!_process_status.ok()) {
+        _set_scanner_done();
+        return _process_status;
+    }
+    std::shared_ptr<ScanTask> scan_task = nullptr;
+    if (!_blocks_queue.empty() && !done()) {
+        _last_fetch_time = UnixMillis();
+        scan_task = _blocks_queue.front();
+        _blocks_queue.pop_front();
+    }
 
-Status ScannerContext::get_block_from_queue(RuntimeState* state, 
vectorized::BlockUPtr* block,
-                                            bool* eos, int id) {
-    std::vector<vectorized::BlockUPtr> merge_blocks;
-    {
-        std::unique_lock l(_transfer_lock);
-        // Normally, the scanner scheduler will schedule ctx.
-        // But when the amount of data in the blocks queue exceeds the upper 
limit,
-        // the scheduler will stop scheduling.
-        // (if the scheduler continues to schedule, it will cause a lot of 
busy running).
-        // At this point, consumers are required to trigger new scheduling to 
ensure that
-        // data can be continuously fetched.
-        bool to_be_schedule = should_be_scheduled();
-
-        bool is_scheduled = false;
-        if (!done() && to_be_schedule && _num_running_scanners == 0) {
-            is_scheduled = true;
-            auto submit_status = 
_scanner_scheduler->submit(shared_from_this());
-            if (!submit_status.ok()) {
-                set_status_on_error(submit_status, false);
+    if (scan_task) {
+        if (!scan_task->status_ok()) {
+            _set_scanner_done();
+            return scan_task->get_status();
+        }
+        // We can only know the block size after reading at least one block
+        // Just take the size of first block as `_estimated_block_size`
+        if (scan_task->first_block) {
+            std::lock_guard<std::mutex> fl(_free_blocks_lock);
+            size_t block_size = scan_task->current_block->allocated_bytes();
+            _free_blocks_memory_usage += block_size;
+            _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
+            scan_task->first_block = false;
+            if (block_size > _estimated_block_size) {
+                _estimated_block_size = block_size;
             }
         }
-
-        // Wait for block from queue
-        {
-            SCOPED_TIMER(_scanner_wait_batch_timer);
-            // scanner batch wait time
-            while (!(!_blocks_queue.empty() || done() || !status().ok() || 
state->is_cancelled())) {
-                if (!is_scheduled && _num_running_scanners == 0 && 
should_be_scheduled()) {
-                    LOG(INFO) << debug_string();
+        // consume current block
+        block->swap(*scan_task->current_block);
+        if (!scan_task->current_block->mem_reuse()) {
+            // it depends on the memory strategy of ScanNode/ScanOperator
+            // we should double check `mem_reuse()` of `current_block` to make 
sure it can be reused
+            _newly_create_free_blocks_num->update(1);
+            scan_task->current_block = 
vectorized::Block::create_unique(_output_tuple_desc->slots(),
+                                                                        
_batch_size, true);
+        }
+        if (scan_task->is_eos()) { // current scanner is finished, and no more 
data to read
+            _num_finished_scanners++;
+            std::weak_ptr<ScannerDelegate> next_scanner;
+            // submit one of the remaining scanners
+            if (_scanners.try_dequeue(next_scanner)) {
+                // reuse current running scanner, just reset some states.
+                scan_task->reuse_scanner(next_scanner);
+                submit_scan_task(scan_task);
+            } else {
+                // no more scanner to be scheduled
+                // `_free_blocks` serve all running scanners, maybe it's too 
large for the remaining scanners
+                int free_blocks_for_each = _free_blocks.size_approx() / 
_num_running_scanners;
+                _num_running_scanners--;
+                std::lock_guard<std::mutex> fl(_free_blocks_lock);
+                for (int i = 0; i < free_blocks_for_each; ++i) {
+                    vectorized::BlockUPtr removed_block;
+                    if (_free_blocks.try_dequeue(removed_block)) {
+                        _free_blocks_memory_usage -= block->allocated_bytes();
+                        
_free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
+                    }
                 }
-                _blocks_queue_added_cv.wait_for(l, 1s);
             }
+        } else {
+            // resubmit current running scanner to read the next block
+            submit_scan_task(scan_task);
         }
+        // scale up
+        _try_to_scale_up();
+    }
 
-        if (state->is_cancelled()) {
-            set_status_on_error(Status::Cancelled("cancelled"), false);
-        }
+    if (_num_finished_scanners == _all_scanners.size() && 
_blocks_queue.empty()) {
+        _set_scanner_done();
+        _is_finished = true;
+    }
+    *eos = done();
+    return Status::OK();
+}
 
-        if (!status().ok()) {
-            return status();
+void ScannerContext::_try_to_scale_up() {
+    // Four criteria to determine whether to increase the parallelism of the 
scanners
+    // 1. It ran for at least `SCALE_UP_DURATION` ms after last scale up
+    // 2. Half(`WAIT_BLOCK_DURATION_RATIO`) of the duration is waiting to get 
blocks
+    // 3. `_free_blocks_memory_usage` < `_max_bytes_in_queue`, remains enough 
memory to scale up
+    // 4. At most scale up `MAX_SCALE_UP_RATIO` times to `_max_thread_num`
+    if (MAX_SCALE_UP_RATIO > 0 && _scanners.size_approx() > 0 &&
+        (_num_running_scanners < _max_thread_num * MAX_SCALE_UP_RATIO) &&
+        (_last_fetch_time - _last_scale_up_time > SCALE_UP_DURATION) && // 
duration > 5000ms
+        (_total_wait_block_time > (_last_fetch_time - _last_scale_up_time) *
+                                          WAIT_BLOCK_DURATION_RATIO)) { // too 
large lock time
+        double wait_ratio =
+                (double)_total_wait_block_time / (_last_fetch_time - 
_last_scale_up_time);
+        if (_last_wait_duration_ratio > 0 && wait_ratio > 
_last_wait_duration_ratio * 0.8) {
+            // when _last_wait_duration_ratio > 0, it has scaled up before.
+            // we need to determine if the scale-up is effective:
+            // the wait duration ratio after last scaling up should less than 
80% of `_last_wait_duration_ratio`
+            return;

Review Comment:
   warning: method 'validate_block_schema' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/vec/exec/scan/scanner_context.h:91:
   ```diff
   -     [[nodiscard]] Status validate_block_schema(Block* block);
   +     [[nodiscard]] static Status validate_block_schema(Block* block);
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to