This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new bbdeef64178 branch-4.0: [fix](debug) Fix illegal access to runtime
states #56439 (#56547)
bbdeef64178 is described below
commit bbdeef64178abf93c90795ebe48bddf7b247a4c8
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sun Sep 28 19:11:55 2025 +0800
branch-4.0: [fix](debug) Fix illegal access to runtime states #56439
(#56547)
Cherry-picked from #56439
Co-authored-by: Gabriel <[email protected]>
---
be/src/pipeline/pipeline_fragment_context.cpp | 76 ++++++++++-----------------
be/src/pipeline/pipeline_fragment_context.h | 38 +++++++-------
be/src/pipeline/pipeline_task.cpp | 9 +++-
3 files changed, 56 insertions(+), 67 deletions(-)
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 05f698ffeba..ec048d0db51 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -144,12 +144,7 @@ PipelineFragmentContext::~PipelineFragmentContext() {
auto st = _query_ctx->exec_status();
for (size_t i = 0; i < _tasks.size(); i++) {
if (!_tasks[i].empty()) {
- _call_back(_tasks[i].front()->runtime_state(), &st);
- }
- }
- for (auto& runtime_states : _task_runtime_states) {
- for (auto& runtime_state : runtime_states) {
- runtime_state.reset();
+ _call_back(_tasks[i].front().first->runtime_state(), &st);
}
}
_tasks.clear();
@@ -234,7 +229,7 @@ void PipelineFragmentContext::cancel(const Status reason) {
for (auto& tasks : _tasks) {
for (auto& task : tasks) {
- task->terminate();
+ task.first->terminate();
}
}
}
@@ -379,9 +374,7 @@ Status
PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
const auto target_size = _params.local_params.size();
_tasks.resize(target_size);
_runtime_filter_mgr_map.resize(target_size);
- _task_runtime_states.resize(_pipelines.size());
for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
- _task_runtime_states[pip_idx].resize(_pipelines[pip_idx]->num_tasks());
_pip_id_to_pipeline[_pipelines[pip_idx]->id()] =
_pipelines[pip_idx].get();
}
auto pipeline_id_to_profile =
_runtime_state->build_pipeline_profile(_pipelines.size());
@@ -416,14 +409,10 @@ Status
PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
auto& pipeline = _pipelines[pip_idx];
if (pipeline->num_tasks() > 1 || i == 0) {
- DCHECK(_task_runtime_states[pip_idx][i] == nullptr)
- <<
print_id(_task_runtime_states[pip_idx][i]->fragment_instance_id()) << " "
- << pipeline->debug_string();
- _task_runtime_states[pip_idx][i] = RuntimeState::create_unique(
+ auto task_runtime_state = RuntimeState::create_unique(
local_params.fragment_instance_id, _params.query_id,
_params.fragment_id,
_params.query_options, _query_ctx->query_globals,
_exec_env,
_query_ctx.get());
- auto& task_runtime_state = _task_runtime_states[pip_idx][i];
{
// Initialize runtime state for this task
task_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker());
@@ -470,7 +459,9 @@ Status
PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
pipeline_id_to_profile[pip_idx].get(),
get_shared_state(pipeline), i);
pipeline->incr_created_tasks(i, task.get());
pipeline_id_to_task.insert({pipeline->id(), task.get()});
- _tasks[i].emplace_back(std::move(task));
+ _tasks[i].emplace_back(
+ std::pair<std::shared_ptr<PipelineTask>,
std::unique_ptr<RuntimeState>> {
+ std::move(task),
std::move(task_runtime_state)});
}
}
@@ -1703,7 +1694,7 @@ Status PipelineFragmentContext::submit() {
auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
for (auto& task : _tasks) {
for (auto& t : task) {
- st = scheduler->submit(t);
+ st = scheduler->submit(t.first);
DBUG_EXECUTE_IF("PipelineFragmentContext.submit.failed",
{ st =
Status::Aborted("PipelineFragmentContext.submit.failed"); });
if (!st) {
@@ -1810,12 +1801,9 @@ std::string
PipelineFragmentContext::get_load_error_url() {
if (const auto& str = _runtime_state->get_error_log_file_path();
!str.empty()) {
return to_load_error_http_path(str);
}
- for (auto& task_states : _task_runtime_states) {
- for (auto& task_state : task_states) {
- if (!task_state) {
- continue;
- }
- if (const auto& str = task_state->get_error_log_file_path();
!str.empty()) {
+ for (auto& tasks : _tasks) {
+ for (auto& task : tasks) {
+ if (const auto& str = task.second->get_error_log_file_path();
!str.empty()) {
return to_load_error_http_path(str);
}
}
@@ -1827,12 +1815,9 @@ std::string
PipelineFragmentContext::get_first_error_msg() {
if (const auto& str = _runtime_state->get_first_error_msg(); !str.empty())
{
return str;
}
- for (auto& task_states : _task_runtime_states) {
- for (auto& task_state : task_states) {
- if (!task_state) {
- continue;
- }
- if (const auto& str = task_state->get_first_error_msg();
!str.empty()) {
+ for (auto& tasks : _tasks) {
+ for (auto& task : tasks) {
+ if (const auto& str = task.second->get_first_error_msg();
!str.empty()) {
return str;
}
}
@@ -1862,11 +1847,9 @@ Status PipelineFragmentContext::send_report(bool done) {
std::vector<RuntimeState*> runtime_states;
- for (auto& task_states : _task_runtime_states) {
- for (auto& task_state : task_states) {
- if (task_state) {
- runtime_states.push_back(task_state.get());
- }
+ for (auto& tasks : _tasks) {
+ for (auto& task : tasks) {
+ runtime_states.push_back(task.second.get());
}
}
@@ -1900,15 +1883,15 @@ size_t
PipelineFragmentContext::get_revocable_size(bool* has_running_task) const
// here to traverse the vector.
for (const auto& task_instances : _tasks) {
for (const auto& task : task_instances) {
- if (task->is_running()) {
+ if (task.first->is_running()) {
LOG_EVERY_N(INFO, 50) << "Query: " << print_id(_query_id)
- << " is running, task: " <<
(void*)task.get()
- << ", is_running: " <<
task->is_running();
+ << " is running, task: " <<
(void*)task.first.get()
+ << ", is_running: " <<
task.first->is_running();
*has_running_task = true;
return 0;
}
- size_t revocable_size = task->get_revocable_size();
+ size_t revocable_size = task.first->get_revocable_size();
if (revocable_size >=
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
res += revocable_size;
}
@@ -1921,9 +1904,9 @@ std::vector<PipelineTask*>
PipelineFragmentContext::get_revocable_tasks() const
std::vector<PipelineTask*> revocable_tasks;
for (const auto& task_instances : _tasks) {
for (const auto& task : task_instances) {
- size_t revocable_size_ = task->get_revocable_size();
+ size_t revocable_size_ = task.first->get_revocable_size();
if (revocable_size_ >=
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
- revocable_tasks.emplace_back(task.get());
+ revocable_tasks.emplace_back(task.first.get());
}
}
}
@@ -1936,9 +1919,8 @@ std::string PipelineFragmentContext::debug_string() {
for (size_t j = 0; j < _tasks.size(); j++) {
fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j);
for (size_t i = 0; i < _tasks[j].size(); i++) {
- fmt::format_to(debug_string_buffer, "Task {}: {}\n{}\n", i,
- _tasks[j][i]->debug_string(),
-
_task_runtime_states[i][j]->local_runtime_filter_mgr()->debug_string());
+ fmt::format_to(debug_string_buffer, "Task {}: {}\n", i,
+ _tasks[j][i].first->debug_string());
}
}
@@ -1988,16 +1970,16 @@
PipelineFragmentContext::collect_realtime_load_channel_profile() const {
return nullptr;
}
- for (const auto& runtime_states : _task_runtime_states) {
- for (const auto& runtime_state : runtime_states) {
- if (runtime_state == nullptr || runtime_state->runtime_profile()
== nullptr) {
+ for (const auto& tasks : _tasks) {
+ for (const auto& task : tasks) {
+ if (task.second->runtime_profile() == nullptr) {
continue;
}
auto tmp_load_channel_profile =
std::make_shared<TRuntimeProfileTree>();
-
runtime_state->runtime_profile()->to_thrift(tmp_load_channel_profile.get(),
-
_runtime_state->profile_level());
+
task.second->runtime_profile()->to_thrift(tmp_load_channel_profile.get(),
+
_runtime_state->profile_level());
_runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
}
}
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index f3ee112b0a0..81b3f57b01f 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -117,7 +117,7 @@ public:
void clear_finished_tasks() {
for (size_t j = 0; j < _tasks.size(); j++) {
for (size_t i = 0; i < _tasks[j].size(); i++) {
- _tasks[j][i]->stop_if_finished();
+ _tasks[j][i].first->stop_if_finished();
}
}
}
@@ -228,8 +228,25 @@ private:
bool _use_serial_source = false;
OperatorPtr _root_op = nullptr;
- // this is a [n * m] matrix. n is parallelism of pipeline engine and m is
the number of pipelines.
- std::vector<std::vector<std::shared_ptr<PipelineTask>>> _tasks;
+ //
+ /**
+ * Matrix stores tasks with local runtime states.
+ * This is a [n * m] matrix. n is parallelism of pipeline engine and m is
the number of pipelines.
+ *
+ * 2-D matrix:
+ * +-------------------------+------------+-------+
+ * | | Pipeline 0 | Pipeline 1 | ... |
+ * +------------+------------+------------+-------+
+ * | Instance 0 | task 0-0 | task 0-1 | ... |
+ * +------------+------------+------------+-------+
+ * | Instance 1 | task 1-0 | task 1-1 | ... |
+ * +------------+------------+------------+-------+
+ * | ... |
+ * +--------------------------------------+-------+
+ */
+ std::vector<
+ std::vector<std::pair<std::shared_ptr<PipelineTask>,
std::unique_ptr<RuntimeState>>>>
+ _tasks;
// TODO: remove the _sink and _multi_cast_stream_sink_senders to set both
// of it in pipeline task not the fragment_context
@@ -299,21 +316,6 @@ private:
// - _task_runtime_states is at the task level, unique to each task.
std::vector<TUniqueId> _fragment_instance_ids;
- /**
- * Local runtime states for each task.
- *
- * 2-D matrix:
- * +-------------------------+------------+-------+
- * | | Instance 0 | Instance 1 | ... |
- * +------------+------------+------------+-------+
- * | Pipeline 0 | task 0-0 | task 0-1 | ... |
- * +------------+------------+------------+-------+
- * | Pipeline 1 | task 1-0 | task 1-1 | ... |
- * +------------+------------+------------+-------+
- * | ... |
- * +--------------------------------------+-------+
- */
- std::vector<std::vector<std::unique_ptr<RuntimeState>>>
_task_runtime_states;
// Total instance num running on all BEs
int _total_instances = -1;
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 2da386e83a1..e069cb840d1 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -735,10 +735,15 @@ std::string PipelineTask::debug_string() {
return fmt::to_string(debug_string_buffer);
}
auto elapsed = fragment->elapsed_time() / NANOS_PER_SEC;
- fmt::format_to(debug_string_buffer,
- " elapse time = {}s, block dependency = [{}]\noperators: ",
elapsed,
+ fmt::format_to(debug_string_buffer, " elapse time = {}s, block dependency
= [{}]\n", elapsed,
cur_blocked_dep && !is_finalized() ?
cur_blocked_dep->debug_string() : "NULL");
+ if (_state && _state->local_runtime_filter_mgr()) {
+ fmt::format_to(debug_string_buffer, "local_runtime_filter_mgr: [{}]\n",
+ _state->local_runtime_filter_mgr()->debug_string());
+ }
+
+ fmt::format_to(debug_string_buffer, "operators: ");
for (size_t i = 0; i < _operators.size(); i++) {
fmt::format_to(debug_string_buffer, "\n{}",
_opened && !is_finalized()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]