This is an automated email from the ASF dual-hosted git repository. ashingau pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new bcf2683b9d2 [fix](scanner) fix concurrency bugs when scanner is stopped or finished (#28650) bcf2683b9d2 is described below commit bcf2683b9d2931305c9c15c24147502832f1955a Author: Ashin Gau <ashin...@users.noreply.github.com> AuthorDate: Thu Dec 21 10:37:58 2023 +0800 [fix](scanner) fix concurrency bugs when scanner is stopped or finished (#28650) `ScannerContext` will schedule scanners even after stopped, and confused with `_is_finished` and `_should_stop`. Only Fix the concurrency bugs when scanner is stopped or finished reported in https://github.com/apache/doris/pull/28384 --- be/src/vec/exec/format/orc/vorc_reader.cpp | 5 +++++ be/src/vec/exec/scan/pip_scanner_context.h | 2 +- be/src/vec/exec/scan/scanner_context.cpp | 15 ++++++++------- be/src/vec/exec/scan/vscanner.cpp | 6 ++---- 4 files changed, 16 insertions(+), 12 deletions(-) diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 16377027132..c57d3807624 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -1403,6 +1403,11 @@ std::string OrcReader::_get_field_name_lower_case(const orc::Type* orc_type, int } Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { + if (_io_ctx && _io_ctx->should_stop) { + *eof = true; + *read_rows = 0; + return Status::OK(); + } if (_push_down_agg_type == TPushAggOp::type::COUNT) { auto rows = std::min(get_remaining_rows(), (int64_t)_batch_size); diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index 8c5818cba9f..309aed96a8c 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -68,7 +68,7 @@ public: { std::unique_lock<std::mutex> l(*_queue_mutexs[id]); if (_blocks_queues[id].empty()) { - *eos = _is_finished || _should_stop; + *eos = done(); return Status::OK(); } if (_process_status.is<ErrorCode::CANCELLED>()) { diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 8a6f6de6e65..99f645ca9e5 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -273,7 +273,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo int num_running_scanners = _num_running_scanners; bool is_scheduled = false; - if (to_be_schedule && _num_running_scanners == 0) { + if (!done() && to_be_schedule && _num_running_scanners == 0) { is_scheduled = true; auto state = _scanner_scheduler->submit(shared_from_this()); if (state.ok()) { @@ -287,8 +287,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo if (wait) { // scanner batch wait time SCOPED_TIMER(_scanner_wait_batch_timer); - while (!(!_blocks_queue.empty() || _is_finished || !status().ok() || - state->is_cancelled())) { + while (!(!_blocks_queue.empty() || done() || !status().ok() || state->is_cancelled())) { if (!is_scheduled && _num_running_scanners == 0 && should_be_scheduled()) { LOG(INFO) << "fatal, cur_bytes_in_queue " << cur_bytes_in_queue << ", serving_blocks_num " << serving_blocks_num @@ -330,7 +329,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo } } } else { - *eos = _is_finished; + *eos = done(); } } @@ -400,8 +399,7 @@ void ScannerContext::dec_num_scheduling_ctx() { void ScannerContext::set_ready_to_finish() { // `_should_stop == true` means this task has already ended and wait for pending finish now. - if (_finish_dependency && _should_stop && _num_running_scanners == 0 && - _num_scheduling_ctx == 0) { + if (_finish_dependency && done() && _num_running_scanners == 0 && _num_scheduling_ctx == 0) { _finish_dependency->set_ready(); } } @@ -524,6 +522,9 @@ std::string ScannerContext::debug_string() { void ScannerContext::reschedule_scanner_ctx() { std::lock_guard l(_transfer_lock); + if (done()) { + return; + } auto state = _scanner_scheduler->submit(shared_from_this()); //todo(wb) rethinking is it better to mark current scan_context failed when submit failed many times? if (state.ok()) { @@ -546,7 +547,7 @@ void ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) { _num_running_scanners--; set_ready_to_finish(); - if (should_be_scheduled()) { + if (!done() && should_be_scheduled()) { auto state = _scanner_scheduler->submit(shared_from_this()); if (state.ok()) { _num_scheduling_ctx++; diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp index 2d1328c0fea..a7c0c3c4062 100644 --- a/be/src/vec/exec/scan/vscanner.cpp +++ b/be/src/vec/exec/scan/vscanner.cpp @@ -113,12 +113,10 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { if (state->is_cancelled()) { return Status::Cancelled("cancelled"); } - + *eof = *eof || _should_stop; // set eof to true if per scanner limit is reached // currently for query: ORDER BY key LIMIT n - if (_limit > 0 && _num_rows_return >= _limit) { - *eof = true; - } + *eof = *eof || (_limit > 0 && _num_rows_return >= _limit); return Status::OK(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org