This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 7fb98a1062b [bug](shared scan) Fix use-after-free when enable pipeline shared scanning (#26199) (#26269) 7fb98a1062b is described below commit 7fb98a1062b50a05f1596f36f06e70b3c2220c52 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Thu Nov 2 11:14:45 2023 +0800 [bug](shared scan) Fix use-after-free when enable pipeline shared scanning (#26199) (#26269) When enable shared scan, all scanners will be created by one instance. When the main instance reach eos and quit, all states of it will be released. But other instances are still possible to get block from those scanners. So we must assure scanners will not be dependent on any states of the main instance after it quit. --- be/src/vec/exec/scan/pip_scanner_context.h | 10 ++++++++-- be/src/vec/exec/scan/scanner_context.cpp | 6 ++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index 13a0ef5b671..b98c628368e 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -62,8 +62,6 @@ public: } } - RETURN_IF_ERROR(validate_block_schema((*block).get())); - _current_used_bytes -= (*block)->allocated_bytes(); return Status::OK(); } @@ -79,6 +77,10 @@ public: if (_need_colocate_distribute) { std::vector<uint64_t> hash_vals; for (const auto& block : blocks) { + auto st = validate_block_schema(block.get()); + if (!st.ok()) { + set_status_on_error(st, false); + } // vectorized calculate hash int rows = block->rows(); const auto element_size = _num_parallel_instances; @@ -110,6 +112,10 @@ public: } } else { for (const auto& block : blocks) { + auto st = validate_block_schema(block.get()); + if (!st.ok()) { + set_status_on_error(st, false); + } local_bytes += block->allocated_bytes(); } diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index a5f575d1750..478d9fb4cb7 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -150,6 +150,10 @@ void ScannerContext::append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& std::lock_guard l(_transfer_lock); auto old_bytes_in_queue = _cur_bytes_in_queue; for (auto& b : blocks) { + auto st = validate_block_schema(b.get()); + if (!st.ok()) { + set_status_on_error(st, false); + } _cur_bytes_in_queue += b->allocated_bytes(); _blocks_queue.push_back(std::move(b)); } @@ -201,8 +205,6 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo *block = std::move(_blocks_queue.front()); _blocks_queue.pop_front(); - RETURN_IF_ERROR(validate_block_schema((*block).get())); - auto block_bytes = (*block)->allocated_bytes(); _cur_bytes_in_queue -= block_bytes; _queued_blocks_memory_usage->add(-block_bytes); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org