This is an automated email from the ASF dual-hosted git repository. mrhhsg pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 87912de93f3 [fix](scan) catch exceptions thrown in scanner (#36101) (#37408) 87912de93f3 is described below commit 87912de93f36d17aeac9c63f66e60628627ff96b Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Fri Jul 12 08:49:39 2024 +0800 [fix](scan) catch exceptions thrown in scanner (#36101) (#37408) ## Proposed changes pick #36101 The uncaught exceptions thrown in the scanner will cause the BE to crash. --- be/src/vec/exec/scan/scanner_scheduler.cpp | 37 +++++++++++++++++++++++++----- 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 0df501f6919..351912f5b17 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -136,8 +136,17 @@ void ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx, } scanner_delegate->_scanner->start_wait_worker_timer(); - auto s = ctx->thread_token->submit_func( - [this, scanner_ref = scan_task, ctx]() { this->_scanner_scan(ctx, scanner_ref); }); + auto s = ctx->thread_token->submit_func([scanner_ref = scan_task, ctx]() { + auto status = [&] { + RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref)); + return Status::OK(); + }(); + + if (!status.ok()) { + scanner_ref->set_status(status); + ctx->append_block_to_queue(scanner_ref); + } + }); if (!s.ok()) { scan_task->set_status(s); ctx->append_block_to_queue(scan_task); @@ -157,16 +166,32 @@ void ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx, is_local ? ctx->get_simple_scan_scheduler() : ctx->get_remote_scan_scheduler(); auto& thread_pool = is_local ? _local_scan_thread_pool : _remote_scan_thread_pool; if (scan_sched) { - auto work_func = [this, scanner_ref = scan_task, ctx]() { - this->_scanner_scan(ctx, scanner_ref); + auto work_func = [scanner_ref = scan_task, ctx]() { + auto status = [&] { + RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref)); + return Status::OK(); + }(); + + if (!status.ok()) { + scanner_ref->set_status(status); + ctx->append_block_to_queue(scanner_ref); + } }; SimplifiedScanTask simple_scan_task = {work_func, ctx}; return scan_sched->submit_scan_task(simple_scan_task); } PriorityThreadPool::Task task; - task.work_function = [this, scanner_ref = scan_task, ctx]() { - this->_scanner_scan(ctx, scanner_ref); + task.work_function = [scanner_ref = scan_task, ctx]() { + auto status = [&] { + RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref)); + return Status::OK(); + }(); + + if (!status.ok()) { + scanner_ref->set_status(status); + ctx->append_block_to_queue(scanner_ref); + } }; task.priority = nice; return thread_pool->offer(task) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org