This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e149139837 [fix](load_stream) Fix flush token deadlock by ensuring 
wait_for_flush_tasks is called before destruction (#60284)
1e149139837 is described below

commit 1e1491398376b23561828e83719838c8d4cfcbc6
Author: Xin Liao <[email protected]>
AuthorDate: Thu Jan 29 22:49:34 2026 +0800

    [fix](load_stream) Fix flush token deadlock by ensuring 
wait_for_flush_tasks is called before destruction (#60284)
    
    When TabletStream is destroyed without pre_close() being called (e.g.,
    on_idle_timeout scenario), the _flush_token destructor calls shutdown()
    which triggers deadlock detection if called from the pool thread.
    
    Root cause:
    - on_idle_timeout() directly calls brpc::StreamClose() without calling
    LoadStream::close()
    - This triggers the destruction chain without calling pre_close() on
    TabletStreams
    - If flush tasks are still running, TabletStream may be destroyed in
    pool thread
    
    Solution:
    - Add IndexStream::~IndexStream() to ensure wait_for_flush_tasks() is
    called on all TabletStreams
    - Add TabletStream::wait_for_flush_tasks() to wait for all flush tasks
    to complete
    - This ensures _flush_token is properly handled before TabletStream
    destruction
    - Revert #60148  (shared_from_this) as it is no longer needed
---
 be/src/runtime/load_stream.cpp | 74 +++++++++++++++++++++++++++---------------
 be/src/runtime/load_stream.h   |  7 +++-
 2 files changed, 54 insertions(+), 27 deletions(-)

diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 6a291d143ff..0a735184b0b 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -148,14 +148,12 @@ Status TabletStream::append_data(const PStreamHeader& 
header, butil::IOBuf* data
     uint32_t new_segid = mapping->at(segid);
     DCHECK(new_segid != std::numeric_limits<uint32_t>::max());
     butil::IOBuf buf = data->movable();
-    auto self = shared_from_this();
-    auto flush_func = [self, new_segid, eos, buf, header, file_type]() mutable 
{
-        signal::set_signal_task_id(self->_load_id);
+    auto flush_func = [this, new_segid, eos, buf, header, file_type]() mutable 
{
+        signal::set_signal_task_id(_load_id);
         g_load_stream_flush_running_threads << -1;
-        auto st =
-                self->_load_stream_writer->append_data(new_segid, 
header.offset(), buf, file_type);
+        auto st = _load_stream_writer->append_data(new_segid, header.offset(), 
buf, file_type);
         if (!st.ok() && !config::is_cloud_mode()) {
-            auto res = ExecEnv::get_tablet(self->_id);
+            auto res = ExecEnv::get_tablet(_id);
             TabletSharedPtr tablet =
                     res.has_value() ? 
std::dynamic_pointer_cast<Tablet>(res.value()) : nullptr;
             if (tablet) {
@@ -166,7 +164,7 @@ Status TabletStream::append_data(const PStreamHeader& 
header, butil::IOBuf* data
             DBUG_EXECUTE_IF("TabletStream.append_data.unknown_file_type",
                             { file_type = static_cast<FileType>(-1); });
             if (file_type == FileType::SEGMENT_FILE || file_type == 
FileType::INVERTED_INDEX_FILE) {
-                st = self->_load_stream_writer->close_writer(new_segid, 
file_type);
+                st = _load_stream_writer->close_writer(new_segid, file_type);
             } else {
                 st = Status::InternalError(
                         "appent data failed, file type error, file type = {}, "
@@ -177,8 +175,8 @@ Status TabletStream::append_data(const PStreamHeader& 
header, butil::IOBuf* data
         DBUG_EXECUTE_IF("TabletStream.append_data.append_failed",
                         { st = Status::InternalError("fault injection"); });
         if (!st.ok()) {
-            self->_status.update(st);
-            LOG(WARNING) << "write data failed " << st << ", " << *self;
+            _status.update(st);
+            LOG(WARNING) << "write data failed " << st << ", " << *this;
         }
     };
     auto load_stream_flush_token_max_tasks = 
config::load_stream_flush_token_max_tasks;
@@ -250,15 +248,14 @@ Status TabletStream::add_segment(const PStreamHeader& 
header, butil::IOBuf* data
     }
     DCHECK(new_segid != std::numeric_limits<uint32_t>::max());
 
-    auto self = shared_from_this();
-    auto add_segment_func = [self, new_segid, stat]() {
-        signal::set_signal_task_id(self->_load_id);
-        auto st = self->_load_stream_writer->add_segment(new_segid, stat);
+    auto add_segment_func = [this, new_segid, stat]() {
+        signal::set_signal_task_id(_load_id);
+        auto st = _load_stream_writer->add_segment(new_segid, stat);
         DBUG_EXECUTE_IF("TabletStream.add_segment.add_segment_failed",
                         { st = Status::InternalError("fault injection"); });
         if (!st.ok()) {
-            self->_status.update(st);
-            LOG(INFO) << "add segment failed " << *self;
+            _status.update(st);
+            LOG(INFO) << "add segment failed " << *this;
         }
     };
     Status st = Status::OK();
@@ -278,9 +275,8 @@ Status 
TabletStream::_run_in_heavy_work_pool(std::function<Status()> fn) {
     std::unique_lock<bthread::Mutex> lock(mu);
     bthread::ConditionVariable cv;
     auto st = Status::OK();
-    auto self = shared_from_this();
-    auto func = [self, &mu, &cv, &st, &fn] {
-        signal::set_signal_task_id(self->_load_id);
+    auto func = [this, &mu, &cv, &st, &fn] {
+        signal::set_signal_task_id(_load_id);
         st = fn();
         std::lock_guard<bthread::Mutex> lock(mu);
         cv.notify_one();
@@ -294,21 +290,37 @@ Status 
TabletStream::_run_in_heavy_work_pool(std::function<Status()> fn) {
     return st;
 }
 
-void TabletStream::pre_close() {
+void TabletStream::wait_for_flush_tasks() {
+    {
+        std::lock_guard lock_guard(_lock);
+        if (_flush_tasks_done) {
+            return;
+        }
+        _flush_tasks_done = true;
+    }
+
     if (!_status.ok()) {
-        // cancel all pending tasks, wait all running tasks to finish
         _flush_token->shutdown();
         return;
     }
 
-    SCOPED_TIMER(_close_wait_timer);
-    _status.update(_run_in_heavy_work_pool([this]() {
+    // Use heavy_work_pool to avoid blocking bthread
+    auto st = _run_in_heavy_work_pool([this]() {
         _flush_token->wait();
         return Status::OK();
-    }));
-    // it is necessary to check status after wait_func,
-    // for create_rowset could fail during add_segment when loading to MOW 
table,
-    // in this case, should skip close to avoid submit_calc_delete_bitmap_task 
which could cause coredump.
+    });
+    if (!st.ok()) {
+        // If heavy_work_pool is unavailable, fall back to shutdown
+        // which will cancel pending tasks and wait for running tasks
+        _flush_token->shutdown();
+        _status.update(st);
+    }
+}
+
+void TabletStream::pre_close() {
+    SCOPED_TIMER(_close_wait_timer);
+    wait_for_flush_tasks();
+
     if (!_status.ok()) {
         return;
     }
@@ -347,6 +359,16 @@ IndexStream::IndexStream(const PUniqueId& load_id, int64_t 
id, int64_t txn_id,
     _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime");
 }
 
+IndexStream::~IndexStream() {
+    // Ensure all TabletStreams have their flush tokens properly handled 
before destruction.
+    // In normal flow, close() should have called pre_close() on all tablet 
streams.
+    // But if IndexStream is destroyed without close() being called (e.g., 
on_idle_timeout),
+    // we need to wait for flush tasks here to ensure flush tokens are 
properly shut down.
+    for (auto& [_, tablet_stream] : _tablet_streams_map) {
+        tablet_stream->wait_for_flush_tasks();
+    }
+}
+
 Status IndexStream::append_data(const PStreamHeader& header, butil::IOBuf* 
data) {
     SCOPED_TIMER(_append_data_timer);
     int64_t tablet_id = header.tablet_id();
diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h
index 1dd45d74650..caf67815be9 100644
--- a/be/src/runtime/load_stream.h
+++ b/be/src/runtime/load_stream.h
@@ -42,7 +42,7 @@ class OlapTableSchemaParam;
 // origin_segid(index) -> new_segid(value in vector)
 using SegIdMapping = std::vector<uint32_t>;
 using FailedTablets = std::vector<std::pair<int64_t, Status>>;
-class TabletStream : public std::enable_shared_from_this<TabletStream> {
+class TabletStream {
 public:
     TabletStream(const PUniqueId& load_id, int64_t id, int64_t txn_id,
                  LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile);
@@ -54,6 +54,9 @@ public:
     Status add_segment(const PStreamHeader& header, butil::IOBuf* data);
     void add_num_segments(int64_t num_segments) { _num_segments += 
num_segments; }
     void disable_num_segments_check() { _check_num_segments = false; }
+    // Wait for all pending flush tasks to complete and shut down the flush 
token.
+    // Safe to call multiple times.
+    void wait_for_flush_tasks();
     void pre_close();
     Status close();
     int64_t id() const { return _id; }
@@ -70,6 +73,7 @@ private:
     std::atomic<uint32_t> _next_segid;
     int64_t _num_segments = 0;
     bool _check_num_segments = true;
+    bool _flush_tasks_done = false;
     bthread::Mutex _lock;
     AtomicStatus _status;
     PUniqueId _load_id;
@@ -88,6 +92,7 @@ public:
     IndexStream(const PUniqueId& load_id, int64_t id, int64_t txn_id,
                 std::shared_ptr<OlapTableSchemaParam> schema, LoadStreamMgr* 
load_stream_mgr,
                 RuntimeProfile* profile);
+    ~IndexStream();
 
     Status append_data(const PStreamHeader& header, butil::IOBuf* data);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to