This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d3317aa33b [Fix](executor)Fix scan entity core #21696
d3317aa33b is described below

commit d3317aa33b9a01bb3b5ebad1e761df5ea38e747c
Author: wangbo <wan...@apache.org>
AuthorDate: Tue Jul 11 15:56:13 2023 +0800

    [Fix](executor)Fix scan entity core #21696
    
    After the last time to call scan_task.scan_func(),the should be ended, this 
means PipelineFragmentContext could be released.
    Then after PipelineFragmentContext is released, visiting its field such as 
query_ctx or _state may cause core dump.
    But it can only explain core 2
    
    void ScannerScheduler::_task_group_scanner_scan(ScannerScheduler* scheduler,
                                                    
taskgroup::ScanTaskTaskGroupQueue* scan_queue) {
        while (!_is_closed) {
            taskgroup::ScanTask scan_task;
            auto success = scan_queue->take(&scan_task);
            if (success) {
                int64_t time_spent = 0;
                {
                    SCOPED_RAW_TIMER(&time_spent);
                    scan_task.scan_func();
                }
                scan_queue->update_statistics(scan_task, time_spent);
            }
        }
    }
---
 be/src/vec/exec/scan/scan_task_queue.cpp   | 11 +++++++----
 be/src/vec/exec/scan/scan_task_queue.h     |  4 +++-
 be/src/vec/exec/scan/scanner_scheduler.cpp |  4 +++-
 3 files changed, 13 insertions(+), 6 deletions(-)

diff --git a/be/src/vec/exec/scan/scan_task_queue.cpp 
b/be/src/vec/exec/scan/scan_task_queue.cpp
index 538f77211c..89235b6b7a 100644
--- a/be/src/vec/exec/scan/scan_task_queue.cpp
+++ b/be/src/vec/exec/scan/scan_task_queue.cpp
@@ -24,11 +24,14 @@
 namespace doris {
 namespace taskgroup {
 static void empty_function() {}
-ScanTask::ScanTask() : ScanTask(empty_function, nullptr, 1) {}
+ScanTask::ScanTask() : ScanTask(empty_function, nullptr, nullptr, 1) {}
 
 ScanTask::ScanTask(WorkFunction scan_func, vectorized::ScannerContext* 
scanner_context,
-                   int priority)
-        : scan_func(std::move(scan_func)), scanner_context(scanner_context), 
priority(priority) {}
+                   TGSTEntityPtr scan_entity, int priority)
+        : scan_func(std::move(scan_func)),
+          scanner_context(scanner_context),
+          scan_entity(scan_entity),
+          priority(priority) {}
 
 ScanTaskQueue::ScanTaskQueue() : 
_queue(config::doris_scanner_thread_pool_queue_size) {}
 
@@ -98,7 +101,7 @@ bool ScanTaskTaskGroupQueue::push_back(ScanTask scan_task) {
 }
 
 void ScanTaskTaskGroupQueue::update_statistics(ScanTask scan_task, int64_t 
time_spent) {
-    auto* entity = 
scan_task.scanner_context->get_task_group()->local_scan_task_entity();
+    auto* entity = scan_task.scan_entity;
     std::unique_lock<std::mutex> lock(_rs_mutex);
     auto find_entity = _group_entities.find(entity);
     bool is_in_queue = find_entity != _group_entities.end();
diff --git a/be/src/vec/exec/scan/scan_task_queue.h 
b/be/src/vec/exec/scan/scan_task_queue.h
index f3c3b792a4..c694859e3c 100644
--- a/be/src/vec/exec/scan/scan_task_queue.h
+++ b/be/src/vec/exec/scan/scan_task_queue.h
@@ -33,7 +33,8 @@ static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 100;
 // Like PriorityThreadPool::Task
 struct ScanTask {
     ScanTask();
-    ScanTask(WorkFunction scan_func, vectorized::ScannerContext* 
scanner_context, int priority);
+    ScanTask(WorkFunction scan_func, vectorized::ScannerContext* 
scanner_context,
+             TGSTEntityPtr scan_entity, int priority);
     bool operator<(const ScanTask& o) const { return priority < o.priority; }
     ScanTask& operator++() {
         priority += 2;
@@ -42,6 +43,7 @@ struct ScanTask {
 
     WorkFunction scan_func;
     vectorized::ScannerContext* scanner_context;
+    TGSTEntityPtr scan_entity;
     int priority;
 };
 
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index a798d097a2..c37760167f 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -226,7 +226,9 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* 
ctx) {
                         auto work_func = [this, scanner = *iter, ctx] {
                             this->_scanner_scan(this, ctx, scanner);
                         };
-                        taskgroup::ScanTask scan_task = {work_func, ctx, nice};
+                        taskgroup::ScanTask scan_task = {
+                                work_func, ctx, 
ctx->get_task_group()->local_scan_task_entity(),
+                                nice};
                         ret = 
_task_group_local_scan_queue->push_back(scan_task);
                     } else {
                         PriorityThreadPool::Task task;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to