This is an automated email from the ASF dual-hosted git repository.

mrhhsg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new da7269c185d [fix](scan) catch exceptions thrown in scanner (#36101)
da7269c185d is described below

commit da7269c185d50e07304e76605d54f31bb2b11660
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Tue Jun 11 16:22:32 2024 +0800

    [fix](scan) catch exceptions thrown in scanner (#36101)
    
    ## Proposed changes
    
    The uncaught exceptions thrown in the scanner will cause the BE to
    crash.
    
    <!--Describe your changes.-->
---
 be/src/vec/exec/scan/scanner_scheduler.cpp | 37 +++++++++++++++++++++++++-----
 1 file changed, 31 insertions(+), 6 deletions(-)

diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index e164a6df3f1..e13ebf7c209 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -136,8 +136,17 @@ void 
ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
         }
 
         scanner_delegate->_scanner->start_wait_worker_timer();
-        auto s = ctx->thread_token->submit_func(
-                [this, scanner_ref = scan_task, ctx]() { 
this->_scanner_scan(ctx, scanner_ref); });
+        auto s = ctx->thread_token->submit_func([scanner_ref = scan_task, 
ctx]() {
+            auto status = [&] {
+                RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
+                return Status::OK();
+            }();
+
+            if (!status.ok()) {
+                scanner_ref->set_status(status);
+                ctx->append_block_to_queue(scanner_ref);
+            }
+        });
         if (!s.ok()) {
             scan_task->set_status(s);
             ctx->append_block_to_queue(scan_task);
@@ -157,16 +166,32 @@ void 
ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
                     is_local ? ctx->get_simple_scan_scheduler() : 
ctx->get_remote_scan_scheduler();
             auto& thread_pool = is_local ? _local_scan_thread_pool : 
_remote_scan_thread_pool;
             if (scan_sched) {
-                auto work_func = [this, scanner_ref = scan_task, ctx]() {
-                    this->_scanner_scan(ctx, scanner_ref);
+                auto work_func = [scanner_ref = scan_task, ctx]() {
+                    auto status = [&] {
+                        RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, 
scanner_ref));
+                        return Status::OK();
+                    }();
+
+                    if (!status.ok()) {
+                        scanner_ref->set_status(status);
+                        ctx->append_block_to_queue(scanner_ref);
+                    }
                 };
                 SimplifiedScanTask simple_scan_task = {work_func, ctx};
                 return scan_sched->submit_scan_task(simple_scan_task);
             }
 
             PriorityThreadPool::Task task;
-            task.work_function = [this, scanner_ref = scan_task, ctx]() {
-                this->_scanner_scan(ctx, scanner_ref);
+            task.work_function = [scanner_ref = scan_task, ctx]() {
+                auto status = [&] {
+                    RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
+                    return Status::OK();
+                }();
+
+                if (!status.ok()) {
+                    scanner_ref->set_status(status);
+                    ctx->append_block_to_queue(scanner_ref);
+                }
             };
             task.priority = nice;
             return thread_pool->offer(task)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to