This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new f23e8bf323f [fix](scanner) Check query status when iterating through rowsets and segments (#41363) f23e8bf323f is described below commit f23e8bf323fcc8c6ef15fbcfe01067eef9122caf Author: zhiqiang <seuhezhiqi...@163.com> AuthorDate: Sat Sep 28 10:12:25 2024 +0800 [fix](scanner) Check query status when iterating through rowsets and segments (#41363) To avoid scanner can not exit when doing large IO. --- be/src/olap/rowset/beta_rowset_reader.cpp | 19 +++++++++++++++++++ be/src/vec/olap/block_reader.cpp | 6 ++++++ be/src/vec/olap/vcollect_iterator.cpp | 18 ++++++++++++++++++ 3 files changed, 43 insertions(+) diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp index 5fdb2d7c41a..d2c7023f659 100644 --- a/be/src/olap/rowset/beta_rowset_reader.cpp +++ b/be/src/olap/rowset/beta_rowset_reader.cpp @@ -351,6 +351,11 @@ Status BetaRowsetReader::next_block(vectorized::Block* block) { return Status::Error<END_OF_FILE>("BetaRowsetReader is empty"); } + RuntimeState* runtime_state = nullptr; + if (_read_context != nullptr) { + runtime_state = _read_context->runtime_state; + } + do { auto s = _iterator->next_batch(block); if (!s.ok()) { @@ -359,6 +364,10 @@ Status BetaRowsetReader::next_block(vectorized::Block* block) { } return s; } + + if (runtime_state != nullptr && runtime_state->is_cancelled()) [[unlikely]] { + return runtime_state->cancel_reason(); + } } while (block->empty()); return Status::OK(); @@ -367,6 +376,12 @@ Status BetaRowsetReader::next_block(vectorized::Block* block) { Status BetaRowsetReader::next_block_view(vectorized::BlockView* block_view) { SCOPED_RAW_TIMER(&_stats->block_fetch_ns); RETURN_IF_ERROR(_init_iterator_once()); + + RuntimeState* runtime_state = nullptr; + if (_read_context != nullptr) { + runtime_state = _read_context->runtime_state; + } + do { auto s = _iterator->next_block_view(block_view); if (!s.ok()) { @@ -375,6 +390,10 @@ Status BetaRowsetReader::next_block_view(vectorized::BlockView* block_view) { } return s; } + + if (runtime_state != nullptr && runtime_state->is_cancelled()) [[unlikely]] { + return runtime_state->cancel_reason(); + } } while (block_view->empty()); return Status::OK(); diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp index e2b4ba39e12..9d79b51975c 100644 --- a/be/src/vec/olap/block_reader.cpp +++ b/be/src/vec/olap/block_reader.cpp @@ -39,6 +39,7 @@ #include "olap/rowset/rowset_reader_context.h" #include "olap/tablet.h" #include "olap/tablet_schema.h" +#include "runtime/runtime_state.h" #include "vec/aggregate_functions/aggregate_function_reader.h" #include "vec/columns/column_nullable.h" #include "vec/columns/column_vector.h" @@ -135,8 +136,13 @@ Status BlockReader::_init_collect_iter(const ReaderParams& read_params) { read_params.read_orderby_key_reverse); std::vector<RowsetReaderSharedPtr> valid_rs_readers; + RuntimeState* runtime_state = read_params.runtime_state; for (int i = 0; i < read_params.rs_splits.size(); ++i) { + if (runtime_state != nullptr && runtime_state->is_cancelled()) { + return runtime_state->cancel_reason(); + } + auto& rs_split = read_params.rs_splits[i]; // _vcollect_iter.topn_next() will init rs_reader by itself diff --git a/be/src/vec/olap/vcollect_iterator.cpp b/be/src/vec/olap/vcollect_iterator.cpp index 3eb768ff803..f7017a058df 100644 --- a/be/src/vec/olap/vcollect_iterator.cpp +++ b/be/src/vec/olap/vcollect_iterator.cpp @@ -490,6 +490,11 @@ int64_t VCollectIterator::Level0Iterator::version() const { } Status VCollectIterator::Level0Iterator::refresh_current_row() { + RuntimeState* runtime_state = nullptr; + if (_reader != nullptr) { + runtime_state = _reader->_reader_context.runtime_state; + } + do { if (_block == nullptr && !_get_data_by_ref) { _block = std::make_shared<Block>(_schema.create_block( @@ -501,6 +506,10 @@ Status VCollectIterator::Level0Iterator::refresh_current_row() { } else { _reset(); auto res = _refresh(); + + if (runtime_state != nullptr && runtime_state->is_cancelled()) [[unlikely]] { + return runtime_state->cancel_reason(); + } if (!res.ok() && !res.is<END_OF_FILE>()) { return res; } @@ -677,8 +686,17 @@ Status VCollectIterator::Level1Iterator::init(bool get_data_by_ref) { } Status VCollectIterator::Level1Iterator::ensure_first_row_ref() { + RuntimeState* runtime_state = nullptr; + if (_reader != nullptr) { + runtime_state = _reader->_reader_context.runtime_state; + } + for (auto iter = _children.begin(); iter != _children.end();) { auto s = (*iter)->ensure_first_row_ref(); + if (runtime_state != nullptr && runtime_state->is_cancelled()) { + return runtime_state->cancel_reason(); + } + if (!s.ok()) { iter = _children.erase(iter); if (!s.is<END_OF_FILE>()) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org