This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ed122b787f [improvement](task exec context) add parent class 
HasTaskExecutionCtx to own the task ctx (#29388)
2ed122b787f is described below

commit 2ed122b787f673797a4e9e20b4af4e4ece149fcd
Author: yiguolei <676222...@qq.com>
AuthorDate: Tue Jan 2 15:28:27 2024 +0800

    [improvement](task exec context) add parent class HasTaskExecutionCtx to 
own the task ctx (#29388)
    
    
    
    ---------
    
    Co-authored-by: yiguolei <yiguo...@gmail.com>
---
 be/src/pipeline/task_scheduler.cpp         |  3 +--
 be/src/runtime/task_execution_context.h    | 26 +++++++++++++++++++++++++-
 be/src/vec/exec/scan/scanner_context.cpp   | 10 ++++------
 be/src/vec/exec/scan/scanner_context.h     |  6 ++----
 be/src/vec/exec/scan/scanner_scheduler.cpp |  4 ++--
 5 files changed, 34 insertions(+), 15 deletions(-)

diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index e814e4cdf2d..91980aeec7a 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -345,8 +345,7 @@ void TaskScheduler::_try_close_task(PipelineTask* task, 
PipelineTaskState state,
                                     Status exec_status) {
     // close_a_pipeline may delete fragment context and will core in some defer
     // code, because the defer code will access fragment context it self.
-    std::shared_ptr<TaskExecutionContext> lock_for_context =
-            task->fragment_context()->shared_from_this();
+    auto lock_for_context = task->fragment_context()->shared_from_this();
     auto status = task->try_close(exec_status);
     auto cancel = [&]() {
         task->query_context()->cancel(true, status.to_string(),
diff --git a/be/src/runtime/task_execution_context.h 
b/be/src/runtime/task_execution_context.h
index 08564840f59..c876ed5cb0d 100644
--- a/be/src/runtime/task_execution_context.h
+++ b/be/src/runtime/task_execution_context.h
@@ -21,11 +21,35 @@
 
 namespace doris {
 
-// This class act as a super class of all context like things
+// This class act as a super class of all context like things such as
+// plan fragment executor or pipelinefragmentcontext or 
pipelinexfragmentcontext
 class TaskExecutionContext : public 
std::enable_shared_from_this<TaskExecutionContext> {
 public:
     TaskExecutionContext() = default;
     virtual ~TaskExecutionContext() = default;
 };
 
+using TaskExecutionContextSPtr = std::shared_ptr<TaskExecutionContext>;
+
+// Task Execution Context maybe plan fragment executor or 
pipelinefragmentcontext or pipelinexfragmentcontext
+// In multi thread scenario, the object is created in main thread (such as 
FragmentExecThread), but the object
+// maybe used in other thread(such as scanner thread, brpc->sender queue). If 
the main thread stopped and destroy
+// the object, then the other thread may core. So the other thread must lock 
the context to ensure the object exists.
+struct HasTaskExecutionCtx {
+    using Weak = typename TaskExecutionContextSPtr::weak_type;
+
+    HasTaskExecutionCtx(TaskExecutionContextSPtr task_exec_ctx) : 
task_exec_ctx_(task_exec_ctx) {}
+
+    // Init task ctx from state, the state has to own a method named 
get_task_execution_context()
+    // like runtime state
+    template <typename T>
+    HasTaskExecutionCtx(T* state) : 
task_exec_ctx_(state->get_task_execution_context()) {}
+
+public:
+    inline TaskExecutionContextSPtr task_exec_ctx() const { return 
task_exec_ctx_.lock(); }
+
+private:
+    Weak task_exec_ctx_;
+};
+
 } // namespace doris
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index 16bb1ce8487..9c87967f505 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -52,7 +52,8 @@ ScannerContext::ScannerContext(RuntimeState* state, const 
TupleDescriptor* outpu
                                pipeline::ScanLocalStateBase* local_state,
                                std::shared_ptr<pipeline::ScanDependency> 
dependency,
                                std::shared_ptr<pipeline::Dependency> 
finish_dependency)
-        : _state(state),
+        : HasTaskExecutionCtx(state),
+          _state(state),
           _parent(nullptr),
           _local_state(local_state),
           _output_tuple_desc(output_row_descriptor
@@ -72,8 +73,6 @@ ScannerContext::ScannerContext(RuntimeState* state, const 
TupleDescriptor* outpu
           _finish_dependency(finish_dependency) {
     DCHECK(_output_row_descriptor == nullptr ||
            _output_row_descriptor->tuple_descriptors().size() == 1);
-    // Use the task exec context as a lock between scanner threads and 
fragment exection threads
-    _task_exec_ctx = _state->get_task_execution_context();
     _query_id = _state->get_query_ctx()->query_id();
     ctx_id = UniqueId::gen_uid().to_string();
     if (_scanners.empty()) {
@@ -102,7 +101,8 @@ ScannerContext::ScannerContext(doris::RuntimeState* state, 
doris::vectorized::VS
                                const std::list<VScannerSPtr>& scanners, 
int64_t limit_,
                                int64_t max_bytes_in_blocks_queue, const int 
num_parallel_instances,
                                pipeline::ScanLocalStateBase* local_state)
-        : _state(state),
+        : HasTaskExecutionCtx(state),
+          _state(state),
           _parent(parent),
           _local_state(local_state),
           _output_tuple_desc(output_row_descriptor
@@ -120,8 +120,6 @@ ScannerContext::ScannerContext(doris::RuntimeState* state, 
doris::vectorized::VS
           _num_parallel_instances(num_parallel_instances) {
     DCHECK(_output_row_descriptor == nullptr ||
            _output_row_descriptor->tuple_descriptors().size() == 1);
-    // Use the task exec context as a lock between scanner threads and 
fragment exection threads
-    _task_exec_ctx = _state->get_task_execution_context();
     _query_id = _state->get_query_ctx()->query_id();
     ctx_id = UniqueId::gen_uid().to_string();
     if (_scanners.empty()) {
diff --git a/be/src/vec/exec/scan/scanner_context.h 
b/be/src/vec/exec/scan/scanner_context.h
index 035d396bf65..8e840a47465 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -65,7 +65,8 @@ class SimplifiedScanScheduler;
 // ScannerContext is also the scheduling unit of ScannerScheduler.
 // ScannerScheduler schedules a ScannerContext at a time,
 // and submits the Scanners to the scanner thread pool for data scanning.
-class ScannerContext : public std::enable_shared_from_this<ScannerContext> {
+class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
+                       public HasTaskExecutionCtx {
     ENABLE_FACTORY_CREATOR(ScannerContext);
 
 public:
@@ -180,8 +181,6 @@ public:
 
     bool _should_reset_thread_name = true;
 
-    std::weak_ptr<TaskExecutionContext> get_task_execution_context() { return 
_task_exec_ctx; }
-
 private:
     template <typename Parent>
     Status _close_and_clear_scanners(Parent* parent, RuntimeState* state);
@@ -199,7 +198,6 @@ protected:
     void _set_scanner_done();
 
     RuntimeState* _state = nullptr;
-    std::weak_ptr<TaskExecutionContext> _task_exec_ctx;
     VScanNode* _parent = nullptr;
     pipeline::ScanLocalStateBase* _local_state = nullptr;
 
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 6ec83e8bd6a..29b53b39353 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -172,7 +172,7 @@ void ScannerScheduler::_schedule_thread(int queue_id) {
 }
 
 void ScannerScheduler::_schedule_scanners(std::shared_ptr<ScannerContext> ctx) 
{
-    auto task_lock = ctx->get_task_execution_context().lock();
+    auto task_lock = ctx->task_exec_ctx();
     if (task_lock == nullptr) {
         // LOG(WARNING) << "could not lock task execution context, query " << 
print_id(_query_id)
         //             << " maybe finished";
@@ -266,7 +266,7 @@ void 
ScannerScheduler::_schedule_scanners(std::shared_ptr<ScannerContext> ctx) {
 
 void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler,
                                      std::shared_ptr<ScannerContext> ctx, 
VScannerSPtr scanner) {
-    auto task_lock = ctx->get_task_execution_context().lock();
+    auto task_lock = ctx->task_exec_ctx();
     if (task_lock == nullptr) {
         // LOG(WARNING) << "could not lock task execution context, query " << 
print_id(_query_id)
         //             << " maybe finished";


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to