This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new fe3b07a998c branch-4.0: [fix](pipeline) Crashing caused by repeated
spill operations #56755 (#57039)
fe3b07a998c is described below
commit fe3b07a998cc8b9a50baf1deb99a4961b9ce8208
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Oct 16 18:10:53 2025 +0800
branch-4.0: [fix](pipeline) Crashing caused by repeated spill operations
#56755 (#57039)
Cherry-picked from #56755
Co-authored-by: Jerry Hu <[email protected]>
---
be/src/pipeline/exec/spill_utils.h | 4 +-
be/src/pipeline/pipeline_task.cpp | 73 +++++++++++++++++++++++-------------
be/src/pipeline/pipeline_task.h | 40 +++++++++++---------
be/src/pipeline/revokable_task.h | 76 ++++++++++++++++++++++++++++++++++++++
be/src/pipeline/task_scheduler.cpp | 11 +++++-
5 files changed, 157 insertions(+), 47 deletions(-)
diff --git a/be/src/pipeline/exec/spill_utils.h
b/be/src/pipeline/exec/spill_utils.h
index d6f2a811f3a..beb2ecaa984 100644
--- a/be/src/pipeline/exec/spill_utils.h
+++ b/be/src/pipeline/exec/spill_utils.h
@@ -201,8 +201,8 @@ protected:
}
void _on_task_started() override {
- LOG(INFO) << "SpillRecoverRunnable, Query: " <<
print_id(_state->query_id())
- << " spill task started, pipeline task id: " <<
_state->task_id();
+ VLOG_DEBUG << "SpillRecoverRunnable, Query: " <<
print_id(_state->query_id())
+ << " spill task started, pipeline task id: " <<
_state->task_id();
COUNTER_UPDATE(_read_wait_in_queue_task_count, -1);
COUNTER_UPDATE(_reading_task_count, 1);
}
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index e41af301a73..7fe201f3802 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -23,6 +23,7 @@
#include <glog/logging.h>
#include <algorithm>
+#include <memory>
#include <ostream>
#include <vector>
@@ -35,6 +36,7 @@
#include "pipeline/pipeline_fragment_context.h"
#include "pipeline/task_queue.h"
#include "pipeline/task_scheduler.h"
+#include "revokable_task.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/query_context.h"
@@ -99,14 +101,15 @@ PipelineTask::~PipelineTask() {
// But pipeline task hold some objects, like operators, shared state, etc. So
that should release
// memory manually.
#ifndef BE_TEST
- SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_mem_tracker);
+ if (_query_mem_tracker) {
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_mem_tracker);
+ }
#endif
_shared_state_map.clear();
_sink_shared_state.reset();
_op_shared_states.clear();
_sink.reset();
_operators.clear();
- _spill_context.reset();
_block.reset();
_pipeline.reset();
}
@@ -309,17 +312,12 @@ bool PipelineTask::is_blockable() const {
}
}
- return _need_to_revoke_memory ||
- std::ranges::any_of(_operators,
+ return std::ranges::any_of(_operators,
[&](OperatorPtr op) -> bool { return
op->is_blockable(_state); }) ||
_sink->is_blockable(_state);
}
bool PipelineTask::_is_blocked() {
- if (_need_to_revoke_memory) {
- return false;
- }
-
// `_dry_run = true` means we do not need data from source operator.
if (!_dry_run) {
for (int i = cast_set<int>(_read_dependencies.size() - 1); i >= 0;
i--) {
@@ -381,11 +379,15 @@ void PipelineTask::terminate() {
* @return
*/
Status PipelineTask::execute(bool* done) {
- if (!_need_to_revoke_memory && (_exec_state != State::RUNNABLE ||
_blocked_dep != nullptr))
- [[unlikely]] {
+ if (_exec_state != State::RUNNABLE || _blocked_dep != nullptr)
[[unlikely]] {
+#ifdef BE_TEST
return Status::InternalError("Pipeline task is not runnable! Task
info: {}",
debug_string());
+#else
+ return Status::FatalError("Pipeline task is not runnable! Task info:
{}", debug_string());
+#endif
}
+
auto fragment_context = _fragment_context.lock();
if (!fragment_context) {
return Status::InternalError("Fragment already finished! Query: {}",
print_id(_query_id));
@@ -480,11 +482,6 @@ Status PipelineTask::execute(bool* done) {
break;
}
- if (_need_to_revoke_memory) {
- _need_to_revoke_memory = false;
- return _sink->revoke_memory(_state, _spill_context);
- }
-
if (time_spent > _exec_time_slice) {
COUNTER_UPDATE(_yield_counts, 1);
break;
@@ -613,6 +610,33 @@ Status PipelineTask::execute(bool* done) {
return Status::OK();
}
+Status PipelineTask::do_revoke_memory(const std::shared_ptr<SpillContext>&
spill_context) {
+ auto fragment_context = _fragment_context.lock();
+ if (!fragment_context) {
+ return Status::InternalError("Fragment already finished! Query: {}",
print_id(_query_id));
+ }
+
+ SCOPED_ATTACH_TASK(_state);
+ ThreadCpuStopWatch cpu_time_stop_watch;
+ cpu_time_stop_watch.start();
+ Defer running_defer {[&]() {
+ int64_t delta_cpu_time = cpu_time_stop_watch.elapsed_time();
+ _task_cpu_timer->update(delta_cpu_time);
+
fragment_context->get_query_ctx()->resource_ctx()->cpu_context()->update_cpu_cost_ms(
+ delta_cpu_time);
+
+ // If task is woke up early, we should terminate all operators, and
this task could be closed immediately.
+ if (_wake_up_early) {
+ terminate();
+ THROW_IF_ERROR(_root->terminate(_state));
+ THROW_IF_ERROR(_sink->terminate(_state));
+ _eos = true;
+ }
+ }};
+
+ return _sink->revoke_memory(_state, spill_context);
+}
+
bool PipelineTask::_try_to_reserve_memory(const size_t reserve_size,
OperatorBase* op) {
auto st =
thread_context()->thread_mem_tracker_mgr->try_reserve(reserve_size);
COUNTER_UPDATE(_memory_reserve_times, 1);
@@ -797,7 +821,7 @@ std::string PipelineTask::debug_string() {
}
size_t PipelineTask::get_revocable_size() const {
- if (is_finalized() || _running || (_eos && !_spilling)) {
+ if (!_opened || is_finalized() || _running || (_eos && !_spilling)) {
return 0;
}
@@ -805,22 +829,19 @@ size_t PipelineTask::get_revocable_size() const {
}
Status PipelineTask::revoke_memory(const std::shared_ptr<SpillContext>&
spill_context) {
+ DCHECK(spill_context);
if (is_finalized()) {
- if (spill_context) {
- spill_context->on_task_finished();
- VLOG_DEBUG << "Query: " << print_id(_state->query_id()) << ",
task: " << ((void*)this)
- << " finalized";
- }
+ spill_context->on_task_finished();
+ VLOG_DEBUG << "Query: " << print_id(_state->query_id()) << ", task: "
<< ((void*)this)
+ << " finalized";
return Status::OK();
}
const auto revocable_size = _sink->revocable_mem_size(_state);
if (revocable_size >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
- _need_to_revoke_memory = true;
- _spill_context = spill_context;
- RETURN_IF_ERROR(
-
_state->get_query_ctx()->get_pipe_exec_scheduler()->submit(shared_from_this()));
- } else if (spill_context) {
+ auto revokable_task =
std::make_shared<RevokableTask>(shared_from_this(), spill_context);
+
RETURN_IF_ERROR(_state->get_query_ctx()->get_pipe_exec_scheduler()->submit(revokable_task));
+ } else {
spill_context->on_task_finished();
LOG(INFO) << "Query: " << print_id(_state->query_id()) << ", task: "
<< ((void*)this)
<< " has not enough data to revoke: " << revocable_size;
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 08b57e0c630..8d5bd6e99ef 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -55,24 +55,24 @@ public:
shared_state_map,
int task_idx);
- ~PipelineTask();
+ virtual ~PipelineTask();
Status prepare(const std::vector<TScanRangeParams>& scan_range, const int
sender_id,
const TDataSink& tsink);
- Status execute(bool* done);
+ virtual Status execute(bool* done);
// if the pipeline create a bunch of pipeline task
// must be call after all pipeline task is finish to release resource
- Status close(Status exec_status, bool close_sink = true);
+ virtual Status close(Status exec_status, bool close_sink = true);
- std::weak_ptr<PipelineFragmentContext>& fragment_context() { return
_fragment_context; }
+ virtual std::weak_ptr<PipelineFragmentContext>& fragment_context() {
return _fragment_context; }
int get_thread_id(int num_threads) const {
return _thread_id == -1 ? _thread_id : _thread_id % num_threads;
}
- PipelineTask& set_thread_id(int thread_id) {
+ virtual PipelineTask& set_thread_id(int thread_id) {
_thread_id = thread_id;
if (thread_id != _thread_id) {
COUNTER_UPDATE(_core_change_times, 1);
@@ -80,7 +80,7 @@ public:
return *this;
}
- Status finalize();
+ virtual Status finalize();
std::string debug_string();
@@ -94,7 +94,7 @@ public:
* Pipeline task is blockable means it will be blocked in the next run. So
we should put it into
* the blocking task scheduler.
*/
- bool is_blockable() const;
+ virtual bool is_blockable() const;
/**
* `shared_state` is shared by different pipeline tasks. This function
aims to establish
@@ -125,7 +125,7 @@ public:
DataSinkOperatorPtr sink() const { return _sink; }
int task_id() const { return _index; };
- bool is_finalized() const { return _exec_state == State::FINALIZED; }
+ virtual bool is_finalized() const { return _exec_state ==
State::FINALIZED; }
void set_wake_up_early(PipelineId wake_by = -1) {
_wake_up_early = true;
@@ -153,19 +153,24 @@ public:
void pop_out_runnable_queue() { _wait_worker_watcher.stop(); }
bool is_running() { return _running.load(); }
- PipelineTask& set_running(bool running) {
- _running.exchange(running);
- return *this;
+ virtual bool set_running(bool running) {
+ bool old_value = !running;
+ _running.compare_exchange_weak(old_value, running);
+ return old_value;
}
- RuntimeState* runtime_state() const { return _state; }
+ virtual RuntimeState* runtime_state() const { return _state; }
+
+ virtual std::string task_name() const {
+ return fmt::format("task{}({})", _index, _pipeline->_name);
+ }
- std::string task_name() const { return fmt::format("task{}({})", _index,
_pipeline->_name); }
+ [[nodiscard]] Status do_revoke_memory(const std::shared_ptr<SpillContext>&
spill_context);
// TODO: Maybe we do not need this safe code anymore
void stop_if_finished();
- PipelineId pipeline_id() const { return _pipeline->id(); }
+ virtual PipelineId pipeline_id() const { return _pipeline->id(); }
[[nodiscard]] size_t get_revocable_size() const;
[[nodiscard]] Status revoke_memory(const std::shared_ptr<SpillContext>&
spill_context);
@@ -175,6 +180,10 @@ public:
return _state_transition(PipelineTask::State::BLOCKED);
}
+protected:
+ // Only used for RevokableTask
+ PipelineTask() : _index(0) {}
+
private:
// Whether this task is blocked before execution (FE 2-phase commit
trigger, runtime filters)
bool _wait_to_start();
@@ -214,9 +223,6 @@ private:
// 3 update task statistics(update _queue_level/_core_id)
int _queue_level = 0;
- bool _need_to_revoke_memory = false;
- std::shared_ptr<SpillContext> _spill_context;
-
RuntimeProfile* _parent_profile = nullptr;
std::unique_ptr<RuntimeProfile> _task_profile;
RuntimeProfile::Counter* _task_cpu_timer = nullptr;
diff --git a/be/src/pipeline/revokable_task.h b/be/src/pipeline/revokable_task.h
new file mode 100644
index 00000000000..d4d253c2703
--- /dev/null
+++ b/be/src/pipeline/revokable_task.h
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "common/status.h"
+#include "pipeline/dependency.h"
+#include "pipeline/exec/operator.h"
+#include "pipeline/exec/spill_utils.h"
+#include "pipeline/pipeline.h"
+#include "pipeline/pipeline_task.h"
+#include "pipeline_task.h"
+
+namespace doris {
+class RuntimeState;
+
+namespace pipeline {
+class PipelineFragmentContext;
+
+class RevokableTask : public PipelineTask {
+public:
+ RevokableTask(PipelineTaskSPtr task, std::shared_ptr<SpillContext>
spill_context)
+ : _task(std::move(task)), _spill_context(std::move(spill_context))
{}
+
+ ~RevokableTask() override = default;
+
+ RuntimeState* runtime_state() const override { return
_task->runtime_state(); }
+
+ Status close(Status exec_status, bool close_sink) override {
+ return _task->close(exec_status, close_sink);
+ }
+
+ Status finalize() override { return _task->finalize(); }
+
+ bool set_running(bool running) override { return
_task->set_running(running); }
+
+ bool is_finalized() const override { return _task->is_finalized(); }
+
+ std::weak_ptr<PipelineFragmentContext>& fragment_context() override {
+ return _task->fragment_context();
+ }
+
+ PipelineTask& set_thread_id(int thread_id) override { return
_task->set_thread_id(thread_id); }
+
+ PipelineId pipeline_id() const override { return _task->pipeline_id(); }
+
+ std::string task_name() const override { return _task->task_name(); }
+
+ Status execute(bool* done) override { return
_task->do_revoke_memory(_spill_context); }
+
+ bool is_blockable() const override { return true; }
+
+private:
+ PipelineTaskSPtr _task;
+ std::shared_ptr<SpillContext> _spill_context;
+};
+
+} // namespace pipeline
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index 794a08155d1..228335d1aa0 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -104,19 +104,26 @@ void TaskScheduler::_do_work(int index) {
// The task is already running, maybe block in now dependency wake up
by other thread
// but the block thread still hold the task, so put it back to the
queue, until the hold
// thread set task->set_running(false)
- if (task->is_running()) {
+ // set_running return the old value
+ if (task->set_running(true)) {
static_cast<void>(_task_queue.push_back(task, index));
continue;
}
+
if (task->is_finalized()) {
+ task->set_running(false);
continue;
}
+
auto fragment_context = task->fragment_context().lock();
if (!fragment_context) {
// Fragment already finished
+ task->set_running(false);
continue;
}
- task->set_running(true).set_thread_id(index);
+
+ task->set_thread_id(index);
+
bool done = false;
auto status = Status::OK();
int64_t exec_ns = 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]