This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new bcfafcc759c [refactor](spill) Refine logics in pipeline task (#50010)
bcfafcc759c is described below

commit bcfafcc759cd686cc435e5b9758ffd73fbb46584
Author: Gabriel <liwenqi...@selectdb.com>
AuthorDate: Mon Apr 14 21:55:14 2025 +0800

    [refactor](spill) Refine logics in pipeline task (#50010)
---
 be/src/pipeline/exec/operator.h   |   5 +-
 be/src/pipeline/pipeline_task.cpp | 130 +++++++++++++++-----------------------
 be/src/pipeline/pipeline_task.h   |   4 ++
 3 files changed, 57 insertions(+), 82 deletions(-)

diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 75a767aaa83..b8b6577a843 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -104,6 +104,7 @@ public:
     [[nodiscard]] virtual Status prepare(RuntimeState* state) = 0;
     [[nodiscard]] virtual Status terminate(RuntimeState* state) = 0;
     [[nodiscard]] virtual Status close(RuntimeState* state);
+    [[nodiscard]] virtual int node_id() const = 0;
 
     [[nodiscard]] virtual Status set_child(OperatorPtr child) {
         if (_child && child != nullptr) {
@@ -625,7 +626,7 @@ public:
 
     [[nodiscard]] int nereids_id() const { return _nereids_id; }
 
-    [[nodiscard]] int node_id() const { return _node_id; }
+    [[nodiscard]] int node_id() const override { return _node_id; }
 
     [[nodiscard]] std::string get_name() const override { return _name; }
 
@@ -887,7 +888,7 @@ public:
     [[nodiscard]] virtual RowDescriptor& row_descriptor() { return 
_row_descriptor; }
 
     [[nodiscard]] int operator_id() const { return _operator_id; }
-    [[nodiscard]] int node_id() const { return _node_id; }
+    [[nodiscard]] int node_id() const override { return _node_id; }
     [[nodiscard]] int nereids_id() const { return _nereids_id; }
 
     [[nodiscard]] int64_t limit() const { return _limit; }
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 3216c8e034d..f6988c175d4 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -355,10 +355,6 @@ Status PipelineTask::execute(bool* done) {
                    (fragment_context->is_canceled() || !_is_pending_finish())) 
{
             *done = true;
         }
-        // If this run is pended by a spilling request, the block will be 
output in next run.
-        if (!_spilling) {
-            
_block->clear_column_data(_root->row_desc().num_materialized_slots());
-        }
     }};
     const auto query_id = _state->query_id();
     // If this task is already EOS and block is empty (which means we already 
output all blocks),
@@ -464,43 +460,8 @@ Status PipelineTask::execute(bool* done) {
 
             if (workload_group && 
_state->get_query_ctx()->enable_reserve_memory() &&
                 reserve_size > 0) {
-                auto st = 
thread_context()->thread_mem_tracker_mgr->try_reserve(reserve_size);
-
-                COUNTER_UPDATE(_memory_reserve_times, 1);
-                if (!st.ok() && !_state->enable_force_spill()) {
-                    COUNTER_UPDATE(_memory_reserve_failed_times, 1);
-                    auto sink_revokable_mem_size = 
_sink->revocable_mem_size(_state);
-                    auto debug_msg = fmt::format(
-                            "Query: {} , try to reserve: {}, operator name: 
{}, operator "
-                            "id: {}, task id: {}, root revocable mem size: {}, 
sink revocable mem"
-                            "size: {}, failed: {}",
-                            print_id(query_id), 
PrettyPrinter::print_bytes(reserve_size),
-                            _root->get_name(), _root->node_id(), 
_state->task_id(),
-                            
PrettyPrinter::print_bytes(_root->revocable_mem_size(_state)),
-                            
PrettyPrinter::print_bytes(sink_revokable_mem_size), st.to_string());
-                    // PROCESS_MEMORY_EXCEEDED error msg alread contains 
process_mem_log_str
-                    if (!st.is<ErrorCode::PROCESS_MEMORY_EXCEEDED>()) {
-                        debug_msg += fmt::format(", debug info: {}",
-                                                 
GlobalMemoryArbitrator::process_mem_log_str());
-                    }
-                    LOG_EVERY_N(INFO, 100) << debug_msg;
-                    // If sink has enough revocable memory, trigger revoke 
memory
-                    if (sink_revokable_mem_size >= 
_state->spill_min_revocable_mem()) {
-                        LOG(INFO) << fmt::format(
-                                "Query: {} sink: {}, node id: {}, task id: "
-                                "{}, revocable mem size: {}",
-                                print_id(query_id), _sink->get_name(), 
_sink->node_id(),
-                                _state->task_id(),
-                                
PrettyPrinter::print_bytes(sink_revokable_mem_size));
-                        
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
-                                _state->get_query_ctx()->shared_from_this(), 
reserve_size, st);
-                        continue;
-                    } else {
-                        // If reserve failed, not add this query to paused 
list, because it is very small, will not
-                        // consume a lot of memory. But need set low memory 
mode to indicate that the system should
-                        // not use too much memory.
-                        _state->get_query_ctx()->set_low_memory_mode();
-                    }
+                if (!_try_to_reserve_memory(reserve_size, _root)) {
+                    continue;
                 }
             }
 
@@ -517,45 +478,8 @@ Status PipelineTask::execute(bool* done) {
             if (_state->get_query_ctx()->enable_reserve_memory() && 
workload_group &&
                 !(_wake_up_early || _dry_run)) {
                 const auto sink_reserve_size = 
_sink->get_reserve_mem_size(_state, _eos);
-                status = sink_reserve_size != 0
-                                 ? 
thread_context()->thread_mem_tracker_mgr->try_reserve(
-                                           sink_reserve_size)
-                                 : Status::OK();
-
-                auto sink_revocable_mem_size = 
_sink->revocable_mem_size(_state);
-                if (status.ok() && _state->enable_force_spill() && 
_sink->is_spillable() &&
-                    sink_revocable_mem_size >= 
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
-                    status = Status(ErrorCode::QUERY_MEMORY_EXCEEDED, "Force 
Spill");
-                }
-
-                if (!status.ok()) {
-                    COUNTER_UPDATE(_memory_reserve_failed_times, 1);
-                    auto debug_msg = fmt::format(
-                            "Query: {} try to reserve: {}, sink name: {}, node 
id: {}, task "
-                            "id: "
-                            "{}, sink revocable mem size: {}, failed: {}",
-                            print_id(query_id), 
PrettyPrinter::print_bytes(sink_reserve_size),
-                            _sink->get_name(), _sink->node_id(), 
_state->task_id(),
-                            
PrettyPrinter::print_bytes(sink_revocable_mem_size),
-                            status.to_string());
-                    // PROCESS_MEMORY_EXCEEDED error msg alread contains 
process_mem_log_str
-                    if (!status.is<ErrorCode::PROCESS_MEMORY_EXCEEDED>()) {
-                        debug_msg += fmt::format(", debug info: {}",
-                                                 
GlobalMemoryArbitrator::process_mem_log_str());
-                    }
-                    // If the operator is not spillable or it is spillable but 
not has much memory to spill
-                    // not need add to paused list, just let it go.
-                    if (sink_revocable_mem_size >=
-                        vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
-                        VLOG_DEBUG << debug_msg;
-                        
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
-                                _state->get_query_ctx()->shared_from_this(), 
sink_reserve_size,
-                                status);
-                        _spilling = true;
-                        continue;
-                    } else {
-                        _state->get_query_ctx()->set_low_memory_mode();
-                    }
+                if (!_try_to_reserve_memory(sink_reserve_size, _sink.get())) {
+                    continue;
                 }
             }
 
@@ -617,6 +541,52 @@ Status PipelineTask::execute(bool* done) {
     return Status::OK();
 }
 
+bool PipelineTask::_try_to_reserve_memory(const size_t reserve_size, 
OperatorBase* op) {
+    auto st = 
thread_context()->thread_mem_tracker_mgr->try_reserve(reserve_size);
+    COUNTER_UPDATE(_memory_reserve_times, 1);
+    auto sink_revocable_mem_size =
+            reserve_size > 0 ? _sink->revocable_mem_size(_state) : 
Status::OK();
+    if (st.ok() && _state->enable_force_spill() && _sink->is_spillable() &&
+        sink_revocable_mem_size >= 
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
+        st = Status(ErrorCode::QUERY_MEMORY_EXCEEDED, "Force Spill");
+    }
+    if (!st.ok()) {
+        COUNTER_UPDATE(_memory_reserve_failed_times, 1);
+        auto debug_msg = fmt::format(
+                "Query: {} , try to reserve: {}, operator name: {}, operator "
+                "id: {}, task id: {}, root revocable mem size: {}, sink 
revocable mem"
+                "size: {}, failed: {}",
+                print_id(_query_id), PrettyPrinter::print_bytes(reserve_size), 
op->get_name(),
+                op->node_id(), _state->task_id(),
+                PrettyPrinter::print_bytes(op->revocable_mem_size(_state)),
+                PrettyPrinter::print_bytes(sink_revocable_mem_size), 
st.to_string());
+        // PROCESS_MEMORY_EXCEEDED error msg alread contains 
process_mem_log_str
+        if (!st.is<ErrorCode::PROCESS_MEMORY_EXCEEDED>()) {
+            debug_msg +=
+                    fmt::format(", debug info: {}", 
GlobalMemoryArbitrator::process_mem_log_str());
+        }
+        LOG_EVERY_N(INFO, 100) << debug_msg;
+        // If sink has enough revocable memory, trigger revoke memory
+        if (sink_revocable_mem_size >= 
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
+            LOG(INFO) << fmt::format(
+                    "Query: {} sink: {}, node id: {}, task id: "
+                    "{}, revocable mem size: {}",
+                    print_id(_query_id), _sink->get_name(), _sink->node_id(), 
_state->task_id(),
+                    PrettyPrinter::print_bytes(sink_revocable_mem_size));
+            ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
+                    _state->get_query_ctx()->shared_from_this(), reserve_size, 
st);
+            _spilling = true;
+            return false;
+        } else {
+            // If reserve failed, not add this query to paused list, because 
it is very small, will not
+            // consume a lot of memory. But need set low memory mode to 
indicate that the system should
+            // not use too much memory.
+            _state->get_query_ctx()->set_low_memory_mode();
+        }
+    }
+    return true;
+}
+
 void PipelineTask::stop_if_finished() {
     auto fragment = _fragment_context.lock();
     if (!fragment) {
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 36a85f7321e..4615e0869e1 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -218,6 +218,10 @@ private:
     void _fresh_profile_counter();
     Status _open();
 
+    // Operator `op` try to reserve memory before executing. Return false if 
reserve failed
+    // otherwise return true.
+    bool _try_to_reserve_memory(const size_t reserve_size, OperatorBase* op);
+
     const TUniqueId _query_id;
     const uint32_t _index;
     PipelinePtr _pipeline;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to