This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f23e8bf323f [fix](scanner) Check query status when iterating through 
rowsets and segments (#41363)
f23e8bf323f is described below

commit f23e8bf323fcc8c6ef15fbcfe01067eef9122caf
Author: zhiqiang <seuhezhiqi...@163.com>
AuthorDate: Sat Sep 28 10:12:25 2024 +0800

    [fix](scanner) Check query status when iterating through rowsets and 
segments (#41363)
    
    To avoid scanner can not exit when doing large IO.
---
 be/src/olap/rowset/beta_rowset_reader.cpp | 19 +++++++++++++++++++
 be/src/vec/olap/block_reader.cpp          |  6 ++++++
 be/src/vec/olap/vcollect_iterator.cpp     | 18 ++++++++++++++++++
 3 files changed, 43 insertions(+)

diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp 
b/be/src/olap/rowset/beta_rowset_reader.cpp
index 5fdb2d7c41a..d2c7023f659 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -351,6 +351,11 @@ Status BetaRowsetReader::next_block(vectorized::Block* 
block) {
         return Status::Error<END_OF_FILE>("BetaRowsetReader is empty");
     }
 
+    RuntimeState* runtime_state = nullptr;
+    if (_read_context != nullptr) {
+        runtime_state = _read_context->runtime_state;
+    }
+
     do {
         auto s = _iterator->next_batch(block);
         if (!s.ok()) {
@@ -359,6 +364,10 @@ Status BetaRowsetReader::next_block(vectorized::Block* 
block) {
             }
             return s;
         }
+
+        if (runtime_state != nullptr && runtime_state->is_cancelled()) 
[[unlikely]] {
+            return runtime_state->cancel_reason();
+        }
     } while (block->empty());
 
     return Status::OK();
@@ -367,6 +376,12 @@ Status BetaRowsetReader::next_block(vectorized::Block* 
block) {
 Status BetaRowsetReader::next_block_view(vectorized::BlockView* block_view) {
     SCOPED_RAW_TIMER(&_stats->block_fetch_ns);
     RETURN_IF_ERROR(_init_iterator_once());
+
+    RuntimeState* runtime_state = nullptr;
+    if (_read_context != nullptr) {
+        runtime_state = _read_context->runtime_state;
+    }
+
     do {
         auto s = _iterator->next_block_view(block_view);
         if (!s.ok()) {
@@ -375,6 +390,10 @@ Status 
BetaRowsetReader::next_block_view(vectorized::BlockView* block_view) {
             }
             return s;
         }
+
+        if (runtime_state != nullptr && runtime_state->is_cancelled()) 
[[unlikely]] {
+            return runtime_state->cancel_reason();
+        }
     } while (block_view->empty());
 
     return Status::OK();
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index e2b4ba39e12..9d79b51975c 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -39,6 +39,7 @@
 #include "olap/rowset/rowset_reader_context.h"
 #include "olap/tablet.h"
 #include "olap/tablet_schema.h"
+#include "runtime/runtime_state.h"
 #include "vec/aggregate_functions/aggregate_function_reader.h"
 #include "vec/columns/column_nullable.h"
 #include "vec/columns/column_vector.h"
@@ -135,8 +136,13 @@ Status BlockReader::_init_collect_iter(const ReaderParams& 
read_params) {
                         read_params.read_orderby_key_reverse);
 
     std::vector<RowsetReaderSharedPtr> valid_rs_readers;
+    RuntimeState* runtime_state = read_params.runtime_state;
 
     for (int i = 0; i < read_params.rs_splits.size(); ++i) {
+        if (runtime_state != nullptr && runtime_state->is_cancelled()) {
+            return runtime_state->cancel_reason();
+        }
+
         auto& rs_split = read_params.rs_splits[i];
 
         // _vcollect_iter.topn_next() will init rs_reader by itself
diff --git a/be/src/vec/olap/vcollect_iterator.cpp 
b/be/src/vec/olap/vcollect_iterator.cpp
index 3eb768ff803..f7017a058df 100644
--- a/be/src/vec/olap/vcollect_iterator.cpp
+++ b/be/src/vec/olap/vcollect_iterator.cpp
@@ -490,6 +490,11 @@ int64_t VCollectIterator::Level0Iterator::version() const {
 }
 
 Status VCollectIterator::Level0Iterator::refresh_current_row() {
+    RuntimeState* runtime_state = nullptr;
+    if (_reader != nullptr) {
+        runtime_state = _reader->_reader_context.runtime_state;
+    }
+
     do {
         if (_block == nullptr && !_get_data_by_ref) {
             _block = std::make_shared<Block>(_schema.create_block(
@@ -501,6 +506,10 @@ Status 
VCollectIterator::Level0Iterator::refresh_current_row() {
         } else {
             _reset();
             auto res = _refresh();
+
+            if (runtime_state != nullptr && runtime_state->is_cancelled()) 
[[unlikely]] {
+                return runtime_state->cancel_reason();
+            }
             if (!res.ok() && !res.is<END_OF_FILE>()) {
                 return res;
             }
@@ -677,8 +686,17 @@ Status VCollectIterator::Level1Iterator::init(bool 
get_data_by_ref) {
 }
 
 Status VCollectIterator::Level1Iterator::ensure_first_row_ref() {
+    RuntimeState* runtime_state = nullptr;
+    if (_reader != nullptr) {
+        runtime_state = _reader->_reader_context.runtime_state;
+    }
+
     for (auto iter = _children.begin(); iter != _children.end();) {
         auto s = (*iter)->ensure_first_row_ref();
+        if (runtime_state != nullptr && runtime_state->is_cancelled()) {
+            return runtime_state->cancel_reason();
+        }
+
         if (!s.ok()) {
             iter = _children.erase(iter);
             if (!s.is<END_OF_FILE>()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to