This is an automated email from the ASF dual-hosted git repository.

ashingau pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new bcf2683b9d2 [fix](scanner) fix concurrency bugs when scanner is 
stopped or finished (#28650)
bcf2683b9d2 is described below

commit bcf2683b9d2931305c9c15c24147502832f1955a
Author: Ashin Gau <ashin...@users.noreply.github.com>
AuthorDate: Thu Dec 21 10:37:58 2023 +0800

    [fix](scanner) fix concurrency bugs when scanner is stopped or finished 
(#28650)
    
    `ScannerContext` will schedule scanners even after stopped, and confused 
with `_is_finished` and `_should_stop`.
     Only Fix the concurrency bugs when scanner is stopped or finished reported 
in https://github.com/apache/doris/pull/28384
---
 be/src/vec/exec/format/orc/vorc_reader.cpp |  5 +++++
 be/src/vec/exec/scan/pip_scanner_context.h |  2 +-
 be/src/vec/exec/scan/scanner_context.cpp   | 15 ++++++++-------
 be/src/vec/exec/scan/vscanner.cpp          |  6 ++----
 4 files changed, 16 insertions(+), 12 deletions(-)

diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp 
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 16377027132..c57d3807624 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -1403,6 +1403,11 @@ std::string OrcReader::_get_field_name_lower_case(const 
orc::Type* orc_type, int
 }
 
 Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
+    if (_io_ctx && _io_ctx->should_stop) {
+        *eof = true;
+        *read_rows = 0;
+        return Status::OK();
+    }
     if (_push_down_agg_type == TPushAggOp::type::COUNT) {
         auto rows = std::min(get_remaining_rows(), (int64_t)_batch_size);
 
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h 
b/be/src/vec/exec/scan/pip_scanner_context.h
index 8c5818cba9f..309aed96a8c 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -68,7 +68,7 @@ public:
         {
             std::unique_lock<std::mutex> l(*_queue_mutexs[id]);
             if (_blocks_queues[id].empty()) {
-                *eos = _is_finished || _should_stop;
+                *eos = done();
                 return Status::OK();
             }
             if (_process_status.is<ErrorCode::CANCELLED>()) {
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index 8a6f6de6e65..99f645ca9e5 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -273,7 +273,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* 
state, vectorized::Blo
         int num_running_scanners = _num_running_scanners;
 
         bool is_scheduled = false;
-        if (to_be_schedule && _num_running_scanners == 0) {
+        if (!done() && to_be_schedule && _num_running_scanners == 0) {
             is_scheduled = true;
             auto state = _scanner_scheduler->submit(shared_from_this());
             if (state.ok()) {
@@ -287,8 +287,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* 
state, vectorized::Blo
         if (wait) {
             // scanner batch wait time
             SCOPED_TIMER(_scanner_wait_batch_timer);
-            while (!(!_blocks_queue.empty() || _is_finished || !status().ok() 
||
-                     state->is_cancelled())) {
+            while (!(!_blocks_queue.empty() || done() || !status().ok() || 
state->is_cancelled())) {
                 if (!is_scheduled && _num_running_scanners == 0 && 
should_be_scheduled()) {
                     LOG(INFO) << "fatal, cur_bytes_in_queue " << 
cur_bytes_in_queue
                               << ", serving_blocks_num " << serving_blocks_num
@@ -330,7 +329,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* 
state, vectorized::Blo
                 }
             }
         } else {
-            *eos = _is_finished;
+            *eos = done();
         }
     }
 
@@ -400,8 +399,7 @@ void ScannerContext::dec_num_scheduling_ctx() {
 
 void ScannerContext::set_ready_to_finish() {
     // `_should_stop == true` means this task has already ended and wait for 
pending finish now.
-    if (_finish_dependency && _should_stop && _num_running_scanners == 0 &&
-        _num_scheduling_ctx == 0) {
+    if (_finish_dependency && done() && _num_running_scanners == 0 && 
_num_scheduling_ctx == 0) {
         _finish_dependency->set_ready();
     }
 }
@@ -524,6 +522,9 @@ std::string ScannerContext::debug_string() {
 
 void ScannerContext::reschedule_scanner_ctx() {
     std::lock_guard l(_transfer_lock);
+    if (done()) {
+        return;
+    }
     auto state = _scanner_scheduler->submit(shared_from_this());
     //todo(wb) rethinking is it better to mark current scan_context failed 
when submit failed many times?
     if (state.ok()) {
@@ -546,7 +547,7 @@ void 
ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) {
     _num_running_scanners--;
     set_ready_to_finish();
 
-    if (should_be_scheduled()) {
+    if (!done() && should_be_scheduled()) {
         auto state = _scanner_scheduler->submit(shared_from_this());
         if (state.ok()) {
             _num_scheduling_ctx++;
diff --git a/be/src/vec/exec/scan/vscanner.cpp 
b/be/src/vec/exec/scan/vscanner.cpp
index 2d1328c0fea..a7c0c3c4062 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -113,12 +113,10 @@ Status VScanner::get_block(RuntimeState* state, Block* 
block, bool* eof) {
     if (state->is_cancelled()) {
         return Status::Cancelled("cancelled");
     }
-
+    *eof = *eof || _should_stop;
     // set eof to true if per scanner limit is reached
     // currently for query: ORDER BY key LIMIT n
-    if (_limit > 0 && _num_rows_return >= _limit) {
-        *eof = true;
-    }
+    *eof = *eof || (_limit > 0 && _num_rows_return >= _limit);
 
     return Status::OK();
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to