This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new d3317aa33b [Fix](executor)Fix scan entity core #21696 d3317aa33b is described below commit d3317aa33b9a01bb3b5ebad1e761df5ea38e747c Author: wangbo <wan...@apache.org> AuthorDate: Tue Jul 11 15:56:13 2023 +0800 [Fix](executor)Fix scan entity core #21696 After the last time to call scan_task.scan_func(),the should be ended, this means PipelineFragmentContext could be released. Then after PipelineFragmentContext is released, visiting its field such as query_ctx or _state may cause core dump. But it can only explain core 2 void ScannerScheduler::_task_group_scanner_scan(ScannerScheduler* scheduler, taskgroup::ScanTaskTaskGroupQueue* scan_queue) { while (!_is_closed) { taskgroup::ScanTask scan_task; auto success = scan_queue->take(&scan_task); if (success) { int64_t time_spent = 0; { SCOPED_RAW_TIMER(&time_spent); scan_task.scan_func(); } scan_queue->update_statistics(scan_task, time_spent); } } } --- be/src/vec/exec/scan/scan_task_queue.cpp | 11 +++++++---- be/src/vec/exec/scan/scan_task_queue.h | 4 +++- be/src/vec/exec/scan/scanner_scheduler.cpp | 4 +++- 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/be/src/vec/exec/scan/scan_task_queue.cpp b/be/src/vec/exec/scan/scan_task_queue.cpp index 538f77211c..89235b6b7a 100644 --- a/be/src/vec/exec/scan/scan_task_queue.cpp +++ b/be/src/vec/exec/scan/scan_task_queue.cpp @@ -24,11 +24,14 @@ namespace doris { namespace taskgroup { static void empty_function() {} -ScanTask::ScanTask() : ScanTask(empty_function, nullptr, 1) {} +ScanTask::ScanTask() : ScanTask(empty_function, nullptr, nullptr, 1) {} ScanTask::ScanTask(WorkFunction scan_func, vectorized::ScannerContext* scanner_context, - int priority) - : scan_func(std::move(scan_func)), scanner_context(scanner_context), priority(priority) {} + TGSTEntityPtr scan_entity, int priority) + : scan_func(std::move(scan_func)), + scanner_context(scanner_context), + scan_entity(scan_entity), + priority(priority) {} ScanTaskQueue::ScanTaskQueue() : _queue(config::doris_scanner_thread_pool_queue_size) {} @@ -98,7 +101,7 @@ bool ScanTaskTaskGroupQueue::push_back(ScanTask scan_task) { } void ScanTaskTaskGroupQueue::update_statistics(ScanTask scan_task, int64_t time_spent) { - auto* entity = scan_task.scanner_context->get_task_group()->local_scan_task_entity(); + auto* entity = scan_task.scan_entity; std::unique_lock<std::mutex> lock(_rs_mutex); auto find_entity = _group_entities.find(entity); bool is_in_queue = find_entity != _group_entities.end(); diff --git a/be/src/vec/exec/scan/scan_task_queue.h b/be/src/vec/exec/scan/scan_task_queue.h index f3c3b792a4..c694859e3c 100644 --- a/be/src/vec/exec/scan/scan_task_queue.h +++ b/be/src/vec/exec/scan/scan_task_queue.h @@ -33,7 +33,8 @@ static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 100; // Like PriorityThreadPool::Task struct ScanTask { ScanTask(); - ScanTask(WorkFunction scan_func, vectorized::ScannerContext* scanner_context, int priority); + ScanTask(WorkFunction scan_func, vectorized::ScannerContext* scanner_context, + TGSTEntityPtr scan_entity, int priority); bool operator<(const ScanTask& o) const { return priority < o.priority; } ScanTask& operator++() { priority += 2; @@ -42,6 +43,7 @@ struct ScanTask { WorkFunction scan_func; vectorized::ScannerContext* scanner_context; + TGSTEntityPtr scan_entity; int priority; }; diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index a798d097a2..c37760167f 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -226,7 +226,9 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) { auto work_func = [this, scanner = *iter, ctx] { this->_scanner_scan(this, ctx, scanner); }; - taskgroup::ScanTask scan_task = {work_func, ctx, nice}; + taskgroup::ScanTask scan_task = { + work_func, ctx, ctx->get_task_group()->local_scan_task_entity(), + nice}; ret = _task_group_local_scan_queue->push_back(scan_task); } else { PriorityThreadPool::Task task; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org