This is an automated email from the ASF dual-hosted git repository.
mrhhsg pushed a commit to branch spill_repartition
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_repartition by this push:
new 35f2c55989f [pipeline] Proactively pause query for spill under memory
pressure in PipelineTask
35f2c55989f is described below
commit 35f2c55989fdaaa17ef1c845ac682856a497b063
Author: Hu Shenggang <[email protected]>
AuthorDate: Sat Feb 28 16:20:11 2026 +0800
[pipeline] Proactively pause query for spill under memory pressure in
PipelineTask
- add `_should_trigger_revoking(reserve_size)` to detect high memory
pressure from query usage and workload group watermark
- trigger `add_paused_query(..., QUERY_MEMORY_EXCEEDED)` before reserve
attempt when revocable memory is significant
- apply `low_memory_mode` to all operators in pipeline (instead of only
root)
- refine reserve path in `_try_to_reserve_memory()`:
- compute revocable memory only when reserve fails or
`enable_force_spill` is on
- keep force-spill guard based on max operator revocable size
- remove temporary debug logs in execute/reserve path
- minor cleanup: use `const auto&` iteration in `get_revocable_size()`
---
be/src/pipeline/pipeline_task.cpp | 133 ++++++++++++++++++++++++++++++++------
be/src/pipeline/pipeline_task.h | 1 +
2 files changed, 115 insertions(+), 19 deletions(-)
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 93e0c9a1485..0ba4beee496 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -367,6 +367,66 @@ void PipelineTask::terminate() {
}
}
+// When current memory pressure is low, memory usage may increase
significantly in the next
+// operator run, while there is no revocable memory available for spilling.
+// Trigger memory revoking when pressure is high and revocable memory is
significant.
+// Memory pressure is evaluated using two signals:
+// 1. Query memory usage exceeds a threshold ratio of the query memory limit.
+// 2. Workload group memory usage reaches the workload group low-watermark
threshold.
+bool PipelineTask::_should_trigger_revoking(const size_t reserve_size) const {
+ if (!_state->enable_spill()) {
+ return false;
+ }
+
+ auto query_mem_tracker = _state->get_query_ctx()->query_mem_tracker();
+ auto wg = _state->get_query_ctx()->workload_group();
+ if (!query_mem_tracker || !wg) {
+ return false;
+ }
+
+ const auto parallelism = std::max(1, _pipeline->num_tasks());
+ const auto water_mark = std::max(std::min(wg->memory_low_watermark(), 50),
10);
+ const auto group_mem_limit = wg->memory_limit();
+ auto query_limit = query_mem_tracker->limit();
+ if (query_limit <= 0) {
+ query_limit = group_mem_limit;
+ } else if (query_limit > group_mem_limit && group_mem_limit > 0) {
+ query_limit = group_mem_limit;
+ }
+
+ if (query_limit <= 0) {
+ return false;
+ }
+
+ bool is_high_memory_pressure = false;
+ const auto used_mem = query_mem_tracker->consumption() + reserve_size *
parallelism;
+ if (used_mem >= int64_t((double(query_limit) * water_mark / 100))) {
+ is_high_memory_pressure = true;
+ }
+
+ if (!is_high_memory_pressure) {
+ bool is_low_watermark;
+ bool is_high_watermark;
+ wg->check_mem_used(&is_low_watermark, &is_high_watermark);
+ is_high_memory_pressure = is_low_watermark || is_high_watermark;
+ }
+
+ if (is_high_memory_pressure) {
+ const auto revocable_size = [&]() {
+ size_t total = _sink->revocable_mem_size(_state);
+ for (const auto& op : _operators) {
+ total += op->revocable_mem_size(_state);
+ }
+ return total;
+ }();
+
+ const auto total_estimated_revocable = revocable_size * parallelism;
+ return total_estimated_revocable >= int64_t(double(query_limit) * 0.2);
+ }
+
+ return false;
+}
+
/**
* `_eos` indicates whether the execution phase is done. `done` indicates
whether we could close
* this task.
@@ -514,7 +574,9 @@ Status PipelineTask::execute(bool* done) {
SCOPED_TIMER(_get_block_timer);
if (_state->low_memory_mode()) {
_sink->set_low_memory_mode(_state);
- _root->set_low_memory_mode(_state);
+ for (auto& op : _operators) {
+ op->set_low_memory_mode(_state);
+ }
}
DEFER_RELEASE_RESERVED();
_get_block_counter->update(1);
@@ -525,14 +587,26 @@ Status PipelineTask::execute(bool* done) {
reserve_size += op->get_reserve_mem_size(_state);
op->reset_reserve_mem_size(_state);
}
- LOG(INFO) << "1 " << reserve_size;
if (workload_group &&
_state->get_query_ctx()
->resource_ctx()
->task_controller()
->is_enable_reserve_memory() &&
reserve_size > 0) {
- LOG(INFO) << "11 " << reserve_size;
+ if (_should_trigger_revoking(reserve_size)) {
+ LOG(INFO) << fmt::format(
+ "Query: {} sink: {}, node id: {}, task id: "
+ "{} when high memory pressure, try to spill",
+ print_id(_query_id), _sink->get_name(),
_sink->node_id(),
+ _state->task_id());
+
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
+
_state->get_query_ctx()->resource_ctx()->shared_from_this(),
+ reserve_size,
+ Status::Error<ErrorCode::QUERY_MEMORY_EXCEEDED>(
+ "high memory pressure, try to spill"));
+ _spilling = true;
+ continue;
+ }
if (!_try_to_reserve_memory(reserve_size, _root)) {
continue;
}
@@ -555,7 +629,21 @@ Status PipelineTask::execute(bool* done) {
workload_group && !(_wake_up_early || _dry_run)) {
const auto sink_reserve_size =
_sink->get_reserve_mem_size(_state, _eos);
- LOG(INFO) << "2 " << sink_reserve_size;
+ if (sink_reserve_size > 0 &&
_should_trigger_revoking(sink_reserve_size)) {
+ LOG(INFO) << fmt::format(
+ "Query: {} sink: {}, node id: {}, task id: "
+ "{} when high memory pressure, try to spill",
+ print_id(_query_id), _sink->get_name(),
_sink->node_id(),
+ _state->task_id());
+
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
+
_state->get_query_ctx()->resource_ctx()->shared_from_this(),
+ sink_reserve_size,
+ Status::Error<ErrorCode::QUERY_MEMORY_EXCEEDED>(
+ "high memory pressure, try to spill"));
+ _spilling = true;
+ continue;
+ }
+
if (sink_reserve_size > 0 &&
!_try_to_reserve_memory(sink_reserve_size, _sink.get())) {
continue;
@@ -666,7 +754,6 @@ Status PipelineTask::do_revoke_memory(const
std::shared_ptr<SpillContext>& spill
}
bool PipelineTask::_try_to_reserve_memory(const size_t reserve_size,
OperatorBase* op) {
- LOG(INFO) << "111 " << reserve_size;
auto st =
thread_context()->thread_mem_tracker_mgr->try_reserve(reserve_size);
// If reserve memory failed and the query is not enable spill, just
disable reserve memory(this will enable
// memory hard limit check, and will cancel the query if allocate memory
failed) and let it run.
@@ -678,24 +765,32 @@ bool PipelineTask::_try_to_reserve_memory(const size_t
reserve_size, OperatorBas
}
COUNTER_UPDATE(_memory_reserve_times, 1);
// Compute total revocable memory across all operators and the sink.
- size_t total_revocable_mem_size = _sink->revocable_mem_size(_state);
- size_t operator_max_revocable_mem_size = total_revocable_mem_size;
- for (auto& cur_op : _operators) {
- total_revocable_mem_size += cur_op->revocable_mem_size(_state);
- if (cur_op->revocable_mem_size(_state) >
operator_max_revocable_mem_size) {
- operator_max_revocable_mem_size =
cur_op->revocable_mem_size(_state);
+ size_t total_revocable_mem_size = 0;
+ size_t operator_max_revocable_mem_size = 0;
+
+ if (!st.ok() || _state->enable_force_spill()) {
+ // Compute total revocable memory across all operators and the sink.
+ total_revocable_mem_size = _sink->revocable_mem_size(_state);
+ operator_max_revocable_mem_size = total_revocable_mem_size;
+ for (auto& cur_op : _operators) {
+ total_revocable_mem_size += cur_op->revocable_mem_size(_state);
+ operator_max_revocable_mem_size =
+ std::max(cur_op->revocable_mem_size(_state),
operator_max_revocable_mem_size);
}
}
+
// During enable force spill, other operators like scan opeartor will also
try to reserve memory and will failed
// here, if not add this check, it will always paused and resumed again.
- if (st.ok() && _state->enable_force_spill() &&
- operator_max_revocable_mem_size >=
_state->minimum_operator_memory_required_bytes()) {
- st = Status::Error<ErrorCode::QUERY_MEMORY_EXCEEDED>(
- "force spill and there is an operator has memory "
- "size {} exceeds min mem size {}",
- PrettyPrinter::print_bytes(operator_max_revocable_mem_size),
-
PrettyPrinter::print_bytes(_state->minimum_operator_memory_required_bytes()));
+ if (st.ok() && _state->enable_force_spill()) {
+ if (operator_max_revocable_mem_size >=
_state->minimum_operator_memory_required_bytes()) {
+ st = Status::Error<ErrorCode::QUERY_MEMORY_EXCEEDED>(
+ "force spill and there is an operator has memory "
+ "size {} exceeds min mem size {}",
+
PrettyPrinter::print_bytes(operator_max_revocable_mem_size),
+
PrettyPrinter::print_bytes(_state->minimum_operator_memory_required_bytes()));
+ }
}
+
if (!st.ok()) {
COUNTER_UPDATE(_memory_reserve_failed_times, 1);
// build per-operator revocable memory info string for debugging
@@ -880,7 +975,7 @@ size_t PipelineTask::get_revocable_size() const {
// Sum revocable memory from every operator in the pipeline + the sink.
// Each operator reports only its own revocable memory (no child
recursion).
size_t total = _sink->revocable_mem_size(_state);
- for (auto& op : _operators) {
+ for (const auto& op : _operators) {
total += op->revocable_mem_size(_state);
}
return total;
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index e2d51858be4..196a52fbe9e 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -201,6 +201,7 @@ private:
// Operator `op` try to reserve memory before executing. Return false if
reserve failed
// otherwise return true.
bool _try_to_reserve_memory(const size_t reserve_size, OperatorBase* op);
+ bool _should_trigger_revoking(const size_t reserve_size) const;
const TUniqueId _query_id;
const uint32_t _index;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]