This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push: new a701945c91 [branch-1.2](improvement) avoid BufferControlBlock block and cancel fail (#16917) a701945c91 is described below commit a701945c91ec29be795130465917c91f165161e3 Author: Mingyu Chen <morning...@163.com> AuthorDate: Tue Feb 21 14:13:29 2023 +0800 [branch-1.2](improvement) avoid BufferControlBlock block and cancel fail (#16917) cherry-pick part of #16304 --- be/src/exec/es/es_scroll_parser.h | 1 + be/src/runtime/buffer_control_block.cpp | 2 +- be/src/runtime/buffer_control_block.h | 2 +- be/src/vec/exec/scan/vscanner.cpp | 7 ++++++- 4 files changed, 9 insertions(+), 3 deletions(-) diff --git a/be/src/exec/es/es_scroll_parser.h b/be/src/exec/es/es_scroll_parser.h index 2803777776..b2fc83595f 100644 --- a/be/src/exec/es/es_scroll_parser.h +++ b/be/src/exec/es/es_scroll_parser.h @@ -54,6 +54,7 @@ private: Status fill_date_slot_with_strval(void* slot, const rapidjson::Value& col, PrimitiveType type); Status fill_date_slot_with_timestamp(void* slot, const rapidjson::Value& col, PrimitiveType type); + private: std::string _scroll_id; int _size; diff --git a/be/src/runtime/buffer_control_block.cpp b/be/src/runtime/buffer_control_block.cpp index 03bd6d3466..4c4d148017 100644 --- a/be/src/runtime/buffer_control_block.cpp +++ b/be/src/runtime/buffer_control_block.cpp @@ -105,7 +105,7 @@ Status BufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>& result) int num_rows = result->result_batch.rows.size(); while ((!_batch_queue.empty() && (num_rows + _buffer_rows) > _buffer_limit) && !_is_cancelled) { - _data_removal.wait(l); + _data_removal.wait_for(l, std::chrono::seconds(1), [&]() { return _is_cancelled.load(); }); } if (_is_cancelled) { diff --git a/be/src/runtime/buffer_control_block.h b/be/src/runtime/buffer_control_block.h index 8528f74164..3ad27bfd19 100644 --- a/be/src/runtime/buffer_control_block.h +++ b/be/src/runtime/buffer_control_block.h @@ -106,7 +106,7 @@ private: // result's query id TUniqueId _fragment_id; bool _is_close; - bool _is_cancelled; + std::atomic_bool _is_cancelled; Status _status; int _buffer_rows; int _buffer_limit; diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp index e0a5814df6..8679f37f89 100644 --- a/be/src/vec/exec/scan/vscanner.cpp +++ b/be/src/vec/exec/scan/vscanner.cpp @@ -69,7 +69,12 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { // record rows return (after filter) for _limit check _num_rows_return += block->rows(); } - } while (block->rows() == 0 && !(*eof) && raw_rows_read() < raw_rows_threshold); + } while (!state->is_cancelled() && block->rows() == 0 && !(*eof) && + raw_rows_read() < raw_rows_threshold); + } + + if (state->is_cancelled()) { + return Status::Cancelled("cancelled"); } // set eof to true if per scanner limit is reached --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org