This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b3d3ffa2de [Bug](pipeline) adjust scanner scheduler.submit and 
_num_scheduling_ctx maintain (#21843)
b3d3ffa2de is described below

commit b3d3ffa2de005e4aa4317e30f39b7ab7df57f05b
Author: Pxl <pxl...@qq.com>
AuthorDate: Tue Jul 18 11:55:21 2023 +0800

    [Bug](pipeline) adjust scanner scheduler.submit and _num_scheduling_ctx 
maintain (#21843)
    
    adjust scanner scheduler.submit and _num_scheduling_ctx maintain
---
 be/src/olap/tablet_schema_cache.cpp        |  1 -
 be/src/vec/exec/scan/pip_scanner_context.h |  2 +-
 be/src/vec/exec/scan/scanner_context.cpp   | 38 ++++++++++++++++++++----------
 be/src/vec/exec/scan/scanner_context.h     | 11 ++++-----
 be/src/vec/exec/scan/scanner_scheduler.cpp |  6 +++--
 be/src/vec/exec/scan/scanner_scheduler.h   |  3 +--
 6 files changed, 36 insertions(+), 25 deletions(-)

diff --git a/be/src/olap/tablet_schema_cache.cpp 
b/be/src/olap/tablet_schema_cache.cpp
index 24aa72a995..ee14358495 100644
--- a/be/src/olap/tablet_schema_cache.cpp
+++ b/be/src/olap/tablet_schema_cache.cpp
@@ -42,7 +42,6 @@ void TabletSchemaCache::stop() {
     while (!_is_stopped) {
         std::this_thread::sleep_for(std::chrono::seconds(1));
     }
-    LOG(INFO) << "xxx stopped";
 }
 
 /**
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h 
b/be/src/vec/exec/scan/pip_scanner_context.h
index 731c3bb926..c720c22d04 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -45,7 +45,7 @@ public:
         {
             std::unique_lock l(_transfer_lock);
             if (state->is_cancelled()) {
-                _process_status = Status::Cancelled("cancelled");
+                set_status_on_error(Status::Cancelled("cancelled"), false);
             }
 
             if (!_process_status.ok()) {
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index b8e5847a15..049283647c 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -28,6 +28,7 @@
 #include <utility>
 
 #include "common/config.h"
+#include "common/status.h"
 #include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
 #include "runtime/query_context.h"
@@ -188,8 +189,12 @@ Status ScannerContext::get_block_from_queue(RuntimeState* 
state, vectorized::Blo
     // At this point, consumers are required to trigger new scheduling to 
ensure that
     // data can be continuously fetched.
     if (has_enough_space_in_blocks_queue() && _num_running_scanners == 0) {
-        _num_scheduling_ctx++;
-        _scanner_scheduler->submit(this);
+        auto state = _scanner_scheduler->submit(this);
+        if (state.ok()) {
+            _num_scheduling_ctx++;
+        } else {
+            set_status_on_error(state, false);
+        }
     }
     // Wait for block from queue
     if (wait) {
@@ -201,11 +206,11 @@ Status ScannerContext::get_block_from_queue(RuntimeState* 
state, vectorized::Blo
     }
 
     if (state->is_cancelled()) {
-        _process_status = Status::Cancelled("cancelled");
+        set_status_on_error(Status::Cancelled("cancelled"), false);
     }
 
-    if (!_process_status.ok()) {
-        return _process_status;
+    if (!status().ok()) {
+        return status();
     }
 
     if (!_blocks_queue.empty()) {
@@ -221,12 +226,16 @@ Status ScannerContext::get_block_from_queue(RuntimeState* 
state, vectorized::Blo
     return Status::OK();
 }
 
-bool ScannerContext::set_status_on_error(const Status& status) {
-    std::lock_guard l(_transfer_lock);
+bool ScannerContext::set_status_on_error(const Status& status, bool need_lock) 
{
+    std::unique_lock l(_transfer_lock, std::defer_lock);
+    if (need_lock) {
+        l.lock();
+    }
     if (_process_status.ok()) {
         _process_status = status;
         _status_error = true;
         _blocks_queue_added_cv.notify_one();
+        _should_stop = true;
         return true;
     }
     return false;
@@ -326,10 +335,12 @@ std::string ScannerContext::debug_string() {
 
 void ScannerContext::reschedule_scanner_ctx() {
     std::lock_guard l(_transfer_lock);
-    auto submit_st = _scanner_scheduler->submit(this);
+    auto state = _scanner_scheduler->submit(this);
     //todo(wb) rethinking is it better to mark current scan_context failed 
when submit failed many times?
-    if (submit_st.ok()) {
+    if (state.ok()) {
         _num_scheduling_ctx++;
+    } else {
+        set_status_on_error(state, false);
     }
 }
 
@@ -340,10 +351,11 @@ void 
ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) {
     }
     std::lock_guard l(_transfer_lock);
     if (has_enough_space_in_blocks_queue()) {
-        _num_scheduling_ctx++;
-        auto submit_st = _scanner_scheduler->submit(this);
-        if (!submit_st.ok()) {
-            _num_scheduling_ctx--;
+        auto state = _scanner_scheduler->submit(this);
+        if (state.ok()) {
+            _num_scheduling_ctx++;
+        } else {
+            set_status_on_error(state, false);
         }
     }
 
diff --git a/be/src/vec/exec/scan/scanner_context.h 
b/be/src/vec/exec/scan/scanner_context.h
index fa264c756a..db36dfe22f 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -86,10 +86,12 @@ public:
     // to return the scanner to the list for next scheduling.
     void push_back_scanner_and_reschedule(VScannerSPtr scanner);
 
-    bool set_status_on_error(const Status& status);
+    bool set_status_on_error(const Status& status, bool need_lock = true);
 
     Status status() {
-        std::lock_guard l(_transfer_lock);
+        if (_process_status.is<ErrorCode::END_OF_FILE>()) {
+            return Status::OK();
+        }
         return _process_status;
     }
 
@@ -102,10 +104,7 @@ public:
     }
 
     // Return true if this ScannerContext need no more process
-    virtual bool done() {
-        std::unique_lock l(_transfer_lock);
-        return _is_finished || _should_stop || !_process_status.ok();
-    }
+    virtual bool done() { return _is_finished || _should_stop; }
 
     // Update the running num of scanners and contexts
     void update_num_running(int32_t scanner_inc, int32_t sched_inc) {
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index c37760167f..e6bec4f9d9 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -53,7 +53,7 @@
 
 namespace doris::vectorized {
 
-ScannerScheduler::ScannerScheduler() {}
+ScannerScheduler::ScannerScheduler() = default;
 
 ScannerScheduler::~ScannerScheduler() {
     if (!_is_init) {
@@ -135,6 +135,9 @@ Status ScannerScheduler::init(ExecEnv* env) {
 }
 
 Status ScannerScheduler::submit(ScannerContext* ctx) {
+    if (ctx->done()) {
+        return Status::EndOfFile("ScannerContext is done");
+    }
     if (ctx->queue_idx == -1) {
         ctx->queue_idx = (_queue_idx++ % QUEUE_NUM);
     }
@@ -163,7 +166,6 @@ void ScannerScheduler::_schedule_thread(int queue_id) {
         // If ctx is done, no need to schedule it again.
         // But should notice that there may still scanners running in scanner 
pool.
     }
-    return;
 }
 
 [[maybe_unused]] static void* run_scanner_bthread(void* arg) {
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h 
b/be/src/vec/exec/scan/scanner_scheduler.h
index d88c844a3e..ccc809becd 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -67,7 +67,7 @@ public:
 
     Status init(ExecEnv* env);
 
-    Status submit(ScannerContext* ctx);
+    [[nodiscard]] Status submit(ScannerContext* ctx);
 
     std::unique_ptr<ThreadPoolToken> 
new_limited_scan_pool_token(ThreadPool::ExecutionMode mode,
                                                                  int 
max_concurrency);
@@ -86,7 +86,6 @@ private:
     void _task_group_scanner_scan(ScannerScheduler* scheduler,
                                   taskgroup::ScanTaskTaskGroupQueue* 
scan_queue);
 
-private:
     // Scheduling queue number.
     // TODO: make it configurable.
     static const int QUEUE_NUM = 4;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to