This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new bbdeef64178 branch-4.0: [fix](debug) Fix illegal access to runtime 
states #56439 (#56547)
bbdeef64178 is described below

commit bbdeef64178abf93c90795ebe48bddf7b247a4c8
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sun Sep 28 19:11:55 2025 +0800

    branch-4.0: [fix](debug) Fix illegal access to runtime states #56439 
(#56547)
    
    Cherry-picked from #56439
    
    Co-authored-by: Gabriel <[email protected]>
---
 be/src/pipeline/pipeline_fragment_context.cpp | 76 ++++++++++-----------------
 be/src/pipeline/pipeline_fragment_context.h   | 38 +++++++-------
 be/src/pipeline/pipeline_task.cpp             |  9 +++-
 3 files changed, 56 insertions(+), 67 deletions(-)

diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 05f698ffeba..ec048d0db51 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -144,12 +144,7 @@ PipelineFragmentContext::~PipelineFragmentContext() {
     auto st = _query_ctx->exec_status();
     for (size_t i = 0; i < _tasks.size(); i++) {
         if (!_tasks[i].empty()) {
-            _call_back(_tasks[i].front()->runtime_state(), &st);
-        }
-    }
-    for (auto& runtime_states : _task_runtime_states) {
-        for (auto& runtime_state : runtime_states) {
-            runtime_state.reset();
+            _call_back(_tasks[i].front().first->runtime_state(), &st);
         }
     }
     _tasks.clear();
@@ -234,7 +229,7 @@ void PipelineFragmentContext::cancel(const Status reason) {
 
     for (auto& tasks : _tasks) {
         for (auto& task : tasks) {
-            task->terminate();
+            task.first->terminate();
         }
     }
 }
@@ -379,9 +374,7 @@ Status 
PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
     const auto target_size = _params.local_params.size();
     _tasks.resize(target_size);
     _runtime_filter_mgr_map.resize(target_size);
-    _task_runtime_states.resize(_pipelines.size());
     for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
-        _task_runtime_states[pip_idx].resize(_pipelines[pip_idx]->num_tasks());
         _pip_id_to_pipeline[_pipelines[pip_idx]->id()] = 
_pipelines[pip_idx].get();
     }
     auto pipeline_id_to_profile = 
_runtime_state->build_pipeline_profile(_pipelines.size());
@@ -416,14 +409,10 @@ Status 
PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
         for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
             auto& pipeline = _pipelines[pip_idx];
             if (pipeline->num_tasks() > 1 || i == 0) {
-                DCHECK(_task_runtime_states[pip_idx][i] == nullptr)
-                        << 
print_id(_task_runtime_states[pip_idx][i]->fragment_instance_id()) << " "
-                        << pipeline->debug_string();
-                _task_runtime_states[pip_idx][i] = RuntimeState::create_unique(
+                auto task_runtime_state = RuntimeState::create_unique(
                         local_params.fragment_instance_id, _params.query_id, 
_params.fragment_id,
                         _params.query_options, _query_ctx->query_globals, 
_exec_env,
                         _query_ctx.get());
-                auto& task_runtime_state = _task_runtime_states[pip_idx][i];
                 {
                     // Initialize runtime state for this task
                     
task_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker());
@@ -470,7 +459,9 @@ Status 
PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
                         pipeline_id_to_profile[pip_idx].get(), 
get_shared_state(pipeline), i);
                 pipeline->incr_created_tasks(i, task.get());
                 pipeline_id_to_task.insert({pipeline->id(), task.get()});
-                _tasks[i].emplace_back(std::move(task));
+                _tasks[i].emplace_back(
+                        std::pair<std::shared_ptr<PipelineTask>, 
std::unique_ptr<RuntimeState>> {
+                                std::move(task), 
std::move(task_runtime_state)});
             }
         }
 
@@ -1703,7 +1694,7 @@ Status PipelineFragmentContext::submit() {
     auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
     for (auto& task : _tasks) {
         for (auto& t : task) {
-            st = scheduler->submit(t);
+            st = scheduler->submit(t.first);
             DBUG_EXECUTE_IF("PipelineFragmentContext.submit.failed",
                             { st = 
Status::Aborted("PipelineFragmentContext.submit.failed"); });
             if (!st) {
@@ -1810,12 +1801,9 @@ std::string 
PipelineFragmentContext::get_load_error_url() {
     if (const auto& str = _runtime_state->get_error_log_file_path(); 
!str.empty()) {
         return to_load_error_http_path(str);
     }
-    for (auto& task_states : _task_runtime_states) {
-        for (auto& task_state : task_states) {
-            if (!task_state) {
-                continue;
-            }
-            if (const auto& str = task_state->get_error_log_file_path(); 
!str.empty()) {
+    for (auto& tasks : _tasks) {
+        for (auto& task : tasks) {
+            if (const auto& str = task.second->get_error_log_file_path(); 
!str.empty()) {
                 return to_load_error_http_path(str);
             }
         }
@@ -1827,12 +1815,9 @@ std::string 
PipelineFragmentContext::get_first_error_msg() {
     if (const auto& str = _runtime_state->get_first_error_msg(); !str.empty()) 
{
         return str;
     }
-    for (auto& task_states : _task_runtime_states) {
-        for (auto& task_state : task_states) {
-            if (!task_state) {
-                continue;
-            }
-            if (const auto& str = task_state->get_first_error_msg(); 
!str.empty()) {
+    for (auto& tasks : _tasks) {
+        for (auto& task : tasks) {
+            if (const auto& str = task.second->get_first_error_msg(); 
!str.empty()) {
                 return str;
             }
         }
@@ -1862,11 +1847,9 @@ Status PipelineFragmentContext::send_report(bool done) {
 
     std::vector<RuntimeState*> runtime_states;
 
-    for (auto& task_states : _task_runtime_states) {
-        for (auto& task_state : task_states) {
-            if (task_state) {
-                runtime_states.push_back(task_state.get());
-            }
+    for (auto& tasks : _tasks) {
+        for (auto& task : tasks) {
+            runtime_states.push_back(task.second.get());
         }
     }
 
@@ -1900,15 +1883,15 @@ size_t 
PipelineFragmentContext::get_revocable_size(bool* has_running_task) const
     // here to traverse the vector.
     for (const auto& task_instances : _tasks) {
         for (const auto& task : task_instances) {
-            if (task->is_running()) {
+            if (task.first->is_running()) {
                 LOG_EVERY_N(INFO, 50) << "Query: " << print_id(_query_id)
-                                      << " is running, task: " << 
(void*)task.get()
-                                      << ", is_running: " << 
task->is_running();
+                                      << " is running, task: " << 
(void*)task.first.get()
+                                      << ", is_running: " << 
task.first->is_running();
                 *has_running_task = true;
                 return 0;
             }
 
-            size_t revocable_size = task->get_revocable_size();
+            size_t revocable_size = task.first->get_revocable_size();
             if (revocable_size >= 
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
                 res += revocable_size;
             }
@@ -1921,9 +1904,9 @@ std::vector<PipelineTask*> 
PipelineFragmentContext::get_revocable_tasks() const
     std::vector<PipelineTask*> revocable_tasks;
     for (const auto& task_instances : _tasks) {
         for (const auto& task : task_instances) {
-            size_t revocable_size_ = task->get_revocable_size();
+            size_t revocable_size_ = task.first->get_revocable_size();
             if (revocable_size_ >= 
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
-                revocable_tasks.emplace_back(task.get());
+                revocable_tasks.emplace_back(task.first.get());
             }
         }
     }
@@ -1936,9 +1919,8 @@ std::string PipelineFragmentContext::debug_string() {
     for (size_t j = 0; j < _tasks.size(); j++) {
         fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j);
         for (size_t i = 0; i < _tasks[j].size(); i++) {
-            fmt::format_to(debug_string_buffer, "Task {}: {}\n{}\n", i,
-                           _tasks[j][i]->debug_string(),
-                           
_task_runtime_states[i][j]->local_runtime_filter_mgr()->debug_string());
+            fmt::format_to(debug_string_buffer, "Task {}: {}\n", i,
+                           _tasks[j][i].first->debug_string());
         }
     }
 
@@ -1988,16 +1970,16 @@ 
PipelineFragmentContext::collect_realtime_load_channel_profile() const {
         return nullptr;
     }
 
-    for (const auto& runtime_states : _task_runtime_states) {
-        for (const auto& runtime_state : runtime_states) {
-            if (runtime_state == nullptr || runtime_state->runtime_profile() 
== nullptr) {
+    for (const auto& tasks : _tasks) {
+        for (const auto& task : tasks) {
+            if (task.second->runtime_profile() == nullptr) {
                 continue;
             }
 
             auto tmp_load_channel_profile = 
std::make_shared<TRuntimeProfileTree>();
 
-            
runtime_state->runtime_profile()->to_thrift(tmp_load_channel_profile.get(),
-                                                        
_runtime_state->profile_level());
+            
task.second->runtime_profile()->to_thrift(tmp_load_channel_profile.get(),
+                                                      
_runtime_state->profile_level());
             
_runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
         }
     }
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index f3ee112b0a0..81b3f57b01f 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -117,7 +117,7 @@ public:
     void clear_finished_tasks() {
         for (size_t j = 0; j < _tasks.size(); j++) {
             for (size_t i = 0; i < _tasks[j].size(); i++) {
-                _tasks[j][i]->stop_if_finished();
+                _tasks[j][i].first->stop_if_finished();
             }
         }
     }
@@ -228,8 +228,25 @@ private:
     bool _use_serial_source = false;
 
     OperatorPtr _root_op = nullptr;
-    // this is a [n * m] matrix. n is parallelism of pipeline engine and m is 
the number of pipelines.
-    std::vector<std::vector<std::shared_ptr<PipelineTask>>> _tasks;
+    //
+    /**
+     * Matrix stores tasks with local runtime states.
+     * This is a [n * m] matrix. n is parallelism of pipeline engine and m is 
the number of pipelines.
+     *
+     * 2-D matrix:
+     * +-------------------------+------------+-------+
+     * |            | Pipeline 0 | Pipeline 1 |  ...  |
+     * +------------+------------+------------+-------+
+     * | Instance 0 |  task 0-0  |  task 0-1  |  ...  |
+     * +------------+------------+------------+-------+
+     * | Instance 1 |  task 1-0  |  task 1-1  |  ...  |
+     * +------------+------------+------------+-------+
+     * | ...                                          |
+     * +--------------------------------------+-------+
+     */
+    std::vector<
+            std::vector<std::pair<std::shared_ptr<PipelineTask>, 
std::unique_ptr<RuntimeState>>>>
+            _tasks;
 
     // TODO: remove the _sink and _multi_cast_stream_sink_senders to set both
     // of it in pipeline task not the fragment_context
@@ -299,21 +316,6 @@ private:
     //    - _task_runtime_states is at the task level, unique to each task.
 
     std::vector<TUniqueId> _fragment_instance_ids;
-    /**
-     * Local runtime states for each task.
-     *
-     * 2-D matrix:
-     * +-------------------------+------------+-------+
-     * |            | Instance 0 | Instance 1 |  ...  |
-     * +------------+------------+------------+-------+
-     * | Pipeline 0 |  task 0-0  |  task 0-1  |  ...  |
-     * +------------+------------+------------+-------+
-     * | Pipeline 1 |  task 1-0  |  task 1-1  |  ...  |
-     * +------------+------------+------------+-------+
-     * | ...                                          |
-     * +--------------------------------------+-------+
-     */
-    std::vector<std::vector<std::unique_ptr<RuntimeState>>> 
_task_runtime_states;
 
     // Total instance num running on all BEs
     int _total_instances = -1;
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 2da386e83a1..e069cb840d1 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -735,10 +735,15 @@ std::string PipelineTask::debug_string() {
         return fmt::to_string(debug_string_buffer);
     }
     auto elapsed = fragment->elapsed_time() / NANOS_PER_SEC;
-    fmt::format_to(debug_string_buffer,
-                   " elapse time = {}s, block dependency = [{}]\noperators: ", 
elapsed,
+    fmt::format_to(debug_string_buffer, " elapse time = {}s, block dependency 
= [{}]\n", elapsed,
                    cur_blocked_dep && !is_finalized() ? 
cur_blocked_dep->debug_string() : "NULL");
 
+    if (_state && _state->local_runtime_filter_mgr()) {
+        fmt::format_to(debug_string_buffer, "local_runtime_filter_mgr: [{}]\n",
+                       _state->local_runtime_filter_mgr()->debug_string());
+    }
+
+    fmt::format_to(debug_string_buffer, "operators: ");
     for (size_t i = 0; i < _operators.size(); i++) {
         fmt::format_to(debug_string_buffer, "\n{}",
                        _opened && !is_finalized()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to