This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new fe3b07a998c branch-4.0: [fix](pipeline) Crashing caused by repeated 
spill operations #56755 (#57039)
fe3b07a998c is described below

commit fe3b07a998cc8b9a50baf1deb99a4961b9ce8208
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Oct 16 18:10:53 2025 +0800

    branch-4.0: [fix](pipeline) Crashing caused by repeated spill operations 
#56755 (#57039)
    
    Cherry-picked from #56755
    
    Co-authored-by: Jerry Hu <[email protected]>
---
 be/src/pipeline/exec/spill_utils.h |  4 +-
 be/src/pipeline/pipeline_task.cpp  | 73 +++++++++++++++++++++++-------------
 be/src/pipeline/pipeline_task.h    | 40 +++++++++++---------
 be/src/pipeline/revokable_task.h   | 76 ++++++++++++++++++++++++++++++++++++++
 be/src/pipeline/task_scheduler.cpp | 11 +++++-
 5 files changed, 157 insertions(+), 47 deletions(-)

diff --git a/be/src/pipeline/exec/spill_utils.h 
b/be/src/pipeline/exec/spill_utils.h
index d6f2a811f3a..beb2ecaa984 100644
--- a/be/src/pipeline/exec/spill_utils.h
+++ b/be/src/pipeline/exec/spill_utils.h
@@ -201,8 +201,8 @@ protected:
     }
 
     void _on_task_started() override {
-        LOG(INFO) << "SpillRecoverRunnable, Query: " << 
print_id(_state->query_id())
-                  << " spill task started, pipeline task id: " << 
_state->task_id();
+        VLOG_DEBUG << "SpillRecoverRunnable, Query: " << 
print_id(_state->query_id())
+                   << " spill task started, pipeline task id: " << 
_state->task_id();
         COUNTER_UPDATE(_read_wait_in_queue_task_count, -1);
         COUNTER_UPDATE(_reading_task_count, 1);
     }
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index e41af301a73..7fe201f3802 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -23,6 +23,7 @@
 #include <glog/logging.h>
 
 #include <algorithm>
+#include <memory>
 #include <ostream>
 #include <vector>
 
@@ -35,6 +36,7 @@
 #include "pipeline/pipeline_fragment_context.h"
 #include "pipeline/task_queue.h"
 #include "pipeline/task_scheduler.h"
+#include "revokable_task.h"
 #include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
 #include "runtime/query_context.h"
@@ -99,14 +101,15 @@ PipelineTask::~PipelineTask() {
 // But pipeline task hold some objects, like operators, shared state, etc. So 
that should release
 // memory manually.
 #ifndef BE_TEST
-    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_mem_tracker);
+    if (_query_mem_tracker) {
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_mem_tracker);
+    }
 #endif
     _shared_state_map.clear();
     _sink_shared_state.reset();
     _op_shared_states.clear();
     _sink.reset();
     _operators.clear();
-    _spill_context.reset();
     _block.reset();
     _pipeline.reset();
 }
@@ -309,17 +312,12 @@ bool PipelineTask::is_blockable() const {
         }
     }
 
-    return _need_to_revoke_memory ||
-           std::ranges::any_of(_operators,
+    return std::ranges::any_of(_operators,
                                [&](OperatorPtr op) -> bool { return 
op->is_blockable(_state); }) ||
            _sink->is_blockable(_state);
 }
 
 bool PipelineTask::_is_blocked() {
-    if (_need_to_revoke_memory) {
-        return false;
-    }
-
     // `_dry_run = true` means we do not need data from source operator.
     if (!_dry_run) {
         for (int i = cast_set<int>(_read_dependencies.size() - 1); i >= 0; 
i--) {
@@ -381,11 +379,15 @@ void PipelineTask::terminate() {
  * @return
  */
 Status PipelineTask::execute(bool* done) {
-    if (!_need_to_revoke_memory && (_exec_state != State::RUNNABLE || 
_blocked_dep != nullptr))
-            [[unlikely]] {
+    if (_exec_state != State::RUNNABLE || _blocked_dep != nullptr) 
[[unlikely]] {
+#ifdef BE_TEST
         return Status::InternalError("Pipeline task is not runnable! Task 
info: {}",
                                      debug_string());
+#else
+        return Status::FatalError("Pipeline task is not runnable! Task info: 
{}", debug_string());
+#endif
     }
+
     auto fragment_context = _fragment_context.lock();
     if (!fragment_context) {
         return Status::InternalError("Fragment already finished! Query: {}", 
print_id(_query_id));
@@ -480,11 +482,6 @@ Status PipelineTask::execute(bool* done) {
             break;
         }
 
-        if (_need_to_revoke_memory) {
-            _need_to_revoke_memory = false;
-            return _sink->revoke_memory(_state, _spill_context);
-        }
-
         if (time_spent > _exec_time_slice) {
             COUNTER_UPDATE(_yield_counts, 1);
             break;
@@ -613,6 +610,33 @@ Status PipelineTask::execute(bool* done) {
     return Status::OK();
 }
 
+Status PipelineTask::do_revoke_memory(const std::shared_ptr<SpillContext>& 
spill_context) {
+    auto fragment_context = _fragment_context.lock();
+    if (!fragment_context) {
+        return Status::InternalError("Fragment already finished! Query: {}", 
print_id(_query_id));
+    }
+
+    SCOPED_ATTACH_TASK(_state);
+    ThreadCpuStopWatch cpu_time_stop_watch;
+    cpu_time_stop_watch.start();
+    Defer running_defer {[&]() {
+        int64_t delta_cpu_time = cpu_time_stop_watch.elapsed_time();
+        _task_cpu_timer->update(delta_cpu_time);
+        
fragment_context->get_query_ctx()->resource_ctx()->cpu_context()->update_cpu_cost_ms(
+                delta_cpu_time);
+
+        // If task is woke up early, we should terminate all operators, and 
this task could be closed immediately.
+        if (_wake_up_early) {
+            terminate();
+            THROW_IF_ERROR(_root->terminate(_state));
+            THROW_IF_ERROR(_sink->terminate(_state));
+            _eos = true;
+        }
+    }};
+
+    return _sink->revoke_memory(_state, spill_context);
+}
+
 bool PipelineTask::_try_to_reserve_memory(const size_t reserve_size, 
OperatorBase* op) {
     auto st = 
thread_context()->thread_mem_tracker_mgr->try_reserve(reserve_size);
     COUNTER_UPDATE(_memory_reserve_times, 1);
@@ -797,7 +821,7 @@ std::string PipelineTask::debug_string() {
 }
 
 size_t PipelineTask::get_revocable_size() const {
-    if (is_finalized() || _running || (_eos && !_spilling)) {
+    if (!_opened || is_finalized() || _running || (_eos && !_spilling)) {
         return 0;
     }
 
@@ -805,22 +829,19 @@ size_t PipelineTask::get_revocable_size() const {
 }
 
 Status PipelineTask::revoke_memory(const std::shared_ptr<SpillContext>& 
spill_context) {
+    DCHECK(spill_context);
     if (is_finalized()) {
-        if (spill_context) {
-            spill_context->on_task_finished();
-            VLOG_DEBUG << "Query: " << print_id(_state->query_id()) << ", 
task: " << ((void*)this)
-                       << " finalized";
-        }
+        spill_context->on_task_finished();
+        VLOG_DEBUG << "Query: " << print_id(_state->query_id()) << ", task: " 
<< ((void*)this)
+                   << " finalized";
         return Status::OK();
     }
 
     const auto revocable_size = _sink->revocable_mem_size(_state);
     if (revocable_size >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
-        _need_to_revoke_memory = true;
-        _spill_context = spill_context;
-        RETURN_IF_ERROR(
-                
_state->get_query_ctx()->get_pipe_exec_scheduler()->submit(shared_from_this()));
-    } else if (spill_context) {
+        auto revokable_task = 
std::make_shared<RevokableTask>(shared_from_this(), spill_context);
+        
RETURN_IF_ERROR(_state->get_query_ctx()->get_pipe_exec_scheduler()->submit(revokable_task));
+    } else {
         spill_context->on_task_finished();
         LOG(INFO) << "Query: " << print_id(_state->query_id()) << ", task: " 
<< ((void*)this)
                   << " has not enough data to revoke: " << revocable_size;
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 08b57e0c630..8d5bd6e99ef 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -55,24 +55,24 @@ public:
                          shared_state_map,
                  int task_idx);
 
-    ~PipelineTask();
+    virtual ~PipelineTask();
 
     Status prepare(const std::vector<TScanRangeParams>& scan_range, const int 
sender_id,
                    const TDataSink& tsink);
 
-    Status execute(bool* done);
+    virtual Status execute(bool* done);
 
     // if the pipeline create a bunch of pipeline task
     // must be call after all pipeline task is finish to release resource
-    Status close(Status exec_status, bool close_sink = true);
+    virtual Status close(Status exec_status, bool close_sink = true);
 
-    std::weak_ptr<PipelineFragmentContext>& fragment_context() { return 
_fragment_context; }
+    virtual std::weak_ptr<PipelineFragmentContext>& fragment_context() { 
return _fragment_context; }
 
     int get_thread_id(int num_threads) const {
         return _thread_id == -1 ? _thread_id : _thread_id % num_threads;
     }
 
-    PipelineTask& set_thread_id(int thread_id) {
+    virtual PipelineTask& set_thread_id(int thread_id) {
         _thread_id = thread_id;
         if (thread_id != _thread_id) {
             COUNTER_UPDATE(_core_change_times, 1);
@@ -80,7 +80,7 @@ public:
         return *this;
     }
 
-    Status finalize();
+    virtual Status finalize();
 
     std::string debug_string();
 
@@ -94,7 +94,7 @@ public:
      * Pipeline task is blockable means it will be blocked in the next run. So 
we should put it into
      * the blocking task scheduler.
      */
-    bool is_blockable() const;
+    virtual bool is_blockable() const;
 
     /**
      * `shared_state` is shared by different pipeline tasks. This function 
aims to establish
@@ -125,7 +125,7 @@ public:
     DataSinkOperatorPtr sink() const { return _sink; }
 
     int task_id() const { return _index; };
-    bool is_finalized() const { return _exec_state == State::FINALIZED; }
+    virtual bool is_finalized() const { return _exec_state == 
State::FINALIZED; }
 
     void set_wake_up_early(PipelineId wake_by = -1) {
         _wake_up_early = true;
@@ -153,19 +153,24 @@ public:
     void pop_out_runnable_queue() { _wait_worker_watcher.stop(); }
 
     bool is_running() { return _running.load(); }
-    PipelineTask& set_running(bool running) {
-        _running.exchange(running);
-        return *this;
+    virtual bool set_running(bool running) {
+        bool old_value = !running;
+        _running.compare_exchange_weak(old_value, running);
+        return old_value;
     }
 
-    RuntimeState* runtime_state() const { return _state; }
+    virtual RuntimeState* runtime_state() const { return _state; }
+
+    virtual std::string task_name() const {
+        return fmt::format("task{}({})", _index, _pipeline->_name);
+    }
 
-    std::string task_name() const { return fmt::format("task{}({})", _index, 
_pipeline->_name); }
+    [[nodiscard]] Status do_revoke_memory(const std::shared_ptr<SpillContext>& 
spill_context);
 
     // TODO: Maybe we do not need this safe code anymore
     void stop_if_finished();
 
-    PipelineId pipeline_id() const { return _pipeline->id(); }
+    virtual PipelineId pipeline_id() const { return _pipeline->id(); }
     [[nodiscard]] size_t get_revocable_size() const;
     [[nodiscard]] Status revoke_memory(const std::shared_ptr<SpillContext>& 
spill_context);
 
@@ -175,6 +180,10 @@ public:
         return _state_transition(PipelineTask::State::BLOCKED);
     }
 
+protected:
+    // Only used for RevokableTask
+    PipelineTask() : _index(0) {}
+
 private:
     // Whether this task is blocked before execution (FE 2-phase commit 
trigger, runtime filters)
     bool _wait_to_start();
@@ -214,9 +223,6 @@ private:
     // 3 update task statistics(update _queue_level/_core_id)
     int _queue_level = 0;
 
-    bool _need_to_revoke_memory = false;
-    std::shared_ptr<SpillContext> _spill_context;
-
     RuntimeProfile* _parent_profile = nullptr;
     std::unique_ptr<RuntimeProfile> _task_profile;
     RuntimeProfile::Counter* _task_cpu_timer = nullptr;
diff --git a/be/src/pipeline/revokable_task.h b/be/src/pipeline/revokable_task.h
new file mode 100644
index 00000000000..d4d253c2703
--- /dev/null
+++ b/be/src/pipeline/revokable_task.h
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "common/status.h"
+#include "pipeline/dependency.h"
+#include "pipeline/exec/operator.h"
+#include "pipeline/exec/spill_utils.h"
+#include "pipeline/pipeline.h"
+#include "pipeline/pipeline_task.h"
+#include "pipeline_task.h"
+
+namespace doris {
+class RuntimeState;
+
+namespace pipeline {
+class PipelineFragmentContext;
+
+class RevokableTask : public PipelineTask {
+public:
+    RevokableTask(PipelineTaskSPtr task, std::shared_ptr<SpillContext> 
spill_context)
+            : _task(std::move(task)), _spill_context(std::move(spill_context)) 
{}
+
+    ~RevokableTask() override = default;
+
+    RuntimeState* runtime_state() const override { return 
_task->runtime_state(); }
+
+    Status close(Status exec_status, bool close_sink) override {
+        return _task->close(exec_status, close_sink);
+    }
+
+    Status finalize() override { return _task->finalize(); }
+
+    bool set_running(bool running) override { return 
_task->set_running(running); }
+
+    bool is_finalized() const override { return _task->is_finalized(); }
+
+    std::weak_ptr<PipelineFragmentContext>& fragment_context() override {
+        return _task->fragment_context();
+    }
+
+    PipelineTask& set_thread_id(int thread_id) override { return 
_task->set_thread_id(thread_id); }
+
+    PipelineId pipeline_id() const override { return _task->pipeline_id(); }
+
+    std::string task_name() const override { return _task->task_name(); }
+
+    Status execute(bool* done) override { return 
_task->do_revoke_memory(_spill_context); }
+
+    bool is_blockable() const override { return true; }
+
+private:
+    PipelineTaskSPtr _task;
+    std::shared_ptr<SpillContext> _spill_context;
+};
+
+} // namespace pipeline
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index 794a08155d1..228335d1aa0 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -104,19 +104,26 @@ void TaskScheduler::_do_work(int index) {
         // The task is already running, maybe block in now dependency wake up 
by other thread
         // but the block thread still hold the task, so put it back to the 
queue, until the hold
         // thread set task->set_running(false)
-        if (task->is_running()) {
+        // set_running return the old value
+        if (task->set_running(true)) {
             static_cast<void>(_task_queue.push_back(task, index));
             continue;
         }
+
         if (task->is_finalized()) {
+            task->set_running(false);
             continue;
         }
+
         auto fragment_context = task->fragment_context().lock();
         if (!fragment_context) {
             // Fragment already finished
+            task->set_running(false);
             continue;
         }
-        task->set_running(true).set_thread_id(index);
+
+        task->set_thread_id(index);
+
         bool done = false;
         auto status = Status::OK();
         int64_t exec_ns = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to