This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 7fb98a1062b [bug](shared scan) Fix use-after-free when enable pipeline 
shared scanning (#26199) (#26269)
7fb98a1062b is described below

commit 7fb98a1062b50a05f1596f36f06e70b3c2220c52
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Thu Nov 2 11:14:45 2023 +0800

    [bug](shared scan) Fix use-after-free when enable pipeline shared scanning 
(#26199) (#26269)
    
    When enable shared scan, all scanners will be created by one instance. When 
the main instance reach eos and quit, all states of it will be released. But 
other instances are still possible to get block from those scanners. So we must 
assure scanners will not be dependent on any states of the main instance after 
it quit.
---
 be/src/vec/exec/scan/pip_scanner_context.h | 10 ++++++++--
 be/src/vec/exec/scan/scanner_context.cpp   |  6 ++++--
 2 files changed, 12 insertions(+), 4 deletions(-)

diff --git a/be/src/vec/exec/scan/pip_scanner_context.h 
b/be/src/vec/exec/scan/pip_scanner_context.h
index 13a0ef5b671..b98c628368e 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -62,8 +62,6 @@ public:
             }
         }
 
-        RETURN_IF_ERROR(validate_block_schema((*block).get()));
-
         _current_used_bytes -= (*block)->allocated_bytes();
         return Status::OK();
     }
@@ -79,6 +77,10 @@ public:
         if (_need_colocate_distribute) {
             std::vector<uint64_t> hash_vals;
             for (const auto& block : blocks) {
+                auto st = validate_block_schema(block.get());
+                if (!st.ok()) {
+                    set_status_on_error(st, false);
+                }
                 // vectorized calculate hash
                 int rows = block->rows();
                 const auto element_size = _num_parallel_instances;
@@ -110,6 +112,10 @@ public:
             }
         } else {
             for (const auto& block : blocks) {
+                auto st = validate_block_schema(block.get());
+                if (!st.ok()) {
+                    set_status_on_error(st, false);
+                }
                 local_bytes += block->allocated_bytes();
             }
 
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index a5f575d1750..478d9fb4cb7 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -150,6 +150,10 @@ void 
ScannerContext::append_blocks_to_queue(std::vector<vectorized::BlockUPtr>&
     std::lock_guard l(_transfer_lock);
     auto old_bytes_in_queue = _cur_bytes_in_queue;
     for (auto& b : blocks) {
+        auto st = validate_block_schema(b.get());
+        if (!st.ok()) {
+            set_status_on_error(st, false);
+        }
         _cur_bytes_in_queue += b->allocated_bytes();
         _blocks_queue.push_back(std::move(b));
     }
@@ -201,8 +205,6 @@ Status ScannerContext::get_block_from_queue(RuntimeState* 
state, vectorized::Blo
         *block = std::move(_blocks_queue.front());
         _blocks_queue.pop_front();
 
-        RETURN_IF_ERROR(validate_block_schema((*block).get()));
-
         auto block_bytes = (*block)->allocated_bytes();
         _cur_bytes_in_queue -= block_bytes;
         _queued_blocks_memory_usage->add(-block_bytes);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to