This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 7058b31edd465b53bc4525f4dc4188f1bbdf6f5c
Author: Kaijie Chen <c...@apache.org>
AuthorDate: Mon May 27 15:29:38 2024 +0800

    [fix](move-memtable) clear load streams before shutdown 
SegmentFileWriterThreadPool (#35217)
---
 be/src/runtime/load_stream.h         |  2 +-
 be/src/runtime/load_stream_mgr.cpp   | 14 ++++++++------
 be/src/runtime/load_stream_mgr.h     |  5 ++---
 be/src/service/internal_service.cpp  |  4 ++--
 be/test/runtime/load_stream_test.cpp |  4 ++--
 5 files changed, 15 insertions(+), 14 deletions(-)

diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h
index be1cb7756a1..c61a2d163de 100644
--- a/be/src/runtime/load_stream.h
+++ b/be/src/runtime/load_stream.h
@@ -169,6 +169,6 @@ private:
     QueryThreadContext _query_thread_context;
 };
 
-using LoadStreamSharedPtr = std::shared_ptr<LoadStream>;
+using LoadStreamPtr = std::unique_ptr<LoadStream>;
 
 } // namespace doris
diff --git a/be/src/runtime/load_stream_mgr.cpp 
b/be/src/runtime/load_stream_mgr.cpp
index 8d9d37c5d3a..c99f69f5607 100644
--- a/be/src/runtime/load_stream_mgr.cpp
+++ b/be/src/runtime/load_stream_mgr.cpp
@@ -44,23 +44,25 @@ LoadStreamMgr::LoadStreamMgr(uint32_t 
segment_file_writer_thread_num,
 }
 
 LoadStreamMgr::~LoadStreamMgr() {
+    _load_streams_map.clear();
     _file_writer_thread_pool->shutdown();
 }
 
 Status LoadStreamMgr::open_load_stream(const POpenLoadStreamRequest* request,
-                                       LoadStreamSharedPtr& load_stream) {
+                                       LoadStream*& load_stream) {
     UniqueId load_id(request->load_id());
 
     {
         std::lock_guard<decltype(_lock)> l(_lock);
         auto it = _load_streams_map.find(load_id);
         if (it != _load_streams_map.end()) {
-            load_stream = it->second;
+            load_stream = it->second.get();
         } else {
-            load_stream = std::make_shared<LoadStream>(request->load_id(), 
this,
-                                                       
request->enable_profile());
-            RETURN_IF_ERROR(load_stream->init(request));
-            _load_streams_map[load_id] = load_stream;
+            auto p = std::make_unique<LoadStream>(request->load_id(), this,
+                                                  request->enable_profile());
+            RETURN_IF_ERROR(p->init(request));
+            load_stream = p.get();
+            _load_streams_map[load_id] = std::move(p);
         }
         load_stream->add_source(request->src_id());
     }
diff --git a/be/src/runtime/load_stream_mgr.h b/be/src/runtime/load_stream_mgr.h
index ff742012774..9e875f3b829 100644
--- a/be/src/runtime/load_stream_mgr.h
+++ b/be/src/runtime/load_stream_mgr.h
@@ -38,8 +38,7 @@ public:
                   FifoThreadPool* light_work_pool);
     ~LoadStreamMgr();
 
-    Status open_load_stream(const POpenLoadStreamRequest* request,
-                            LoadStreamSharedPtr& load_stream);
+    Status open_load_stream(const POpenLoadStreamRequest* request, 
LoadStream*& load_stream);
     void clear_load(UniqueId loadid);
     void create_tokens(std::vector<std::unique_ptr<ThreadPoolToken>>& tokens) {
         for (int i = 0; i < _num_threads * 2; i++) {
@@ -56,7 +55,7 @@ public:
 
 private:
     std::mutex _lock;
-    std::unordered_map<UniqueId, LoadStreamSharedPtr> _load_streams_map;
+    std::unordered_map<UniqueId, LoadStreamPtr> _load_streams_map;
     std::unique_ptr<ThreadPool> _file_writer_thread_pool;
 
     uint32_t _num_threads = 0;
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 6abc972634a..6d2f76e4c23 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -388,14 +388,14 @@ void 
PInternalServiceImpl::open_load_stream(google::protobuf::RpcController* con
             
tablet->tablet_schema()->to_schema_pb(resp->mutable_tablet_schema());
         }
 
-        LoadStreamSharedPtr load_stream;
+        LoadStream* load_stream = nullptr;
         auto st = _load_stream_mgr->open_load_stream(request, load_stream);
         if (!st.ok()) {
             st.to_protobuf(response->mutable_status());
             return;
         }
 
-        stream_options.handler = load_stream.get();
+        stream_options.handler = load_stream;
         stream_options.idle_timeout_ms = request->idle_timeout_ms();
         
DBUG_EXECUTE_IF("PInternalServiceImpl.open_load_stream.set_idle_timeout",
                         { stream_options.idle_timeout_ms = 1; });
diff --git a/be/test/runtime/load_stream_test.cpp 
b/be/test/runtime/load_stream_test.cpp
index d248e1019f4..f1e06aa1776 100644
--- a/be/test/runtime/load_stream_test.cpp
+++ b/be/test/runtime/load_stream_test.cpp
@@ -375,12 +375,12 @@ public:
                 
tablet->tablet_schema()->to_schema_pb(resp->mutable_tablet_schema());
             }
 
-            LoadStreamSharedPtr load_stream;
+            LoadStream* load_stream;
             LOG(INFO) << "total streams: " << request->total_streams();
             EXPECT_GT(request->total_streams(), 0);
             auto st = _load_stream_mgr->open_load_stream(request, load_stream);
 
-            stream_options.handler = load_stream.get();
+            stream_options.handler = load_stream;
 
             StreamId streamid;
             if (brpc::StreamAccept(&streamid, *cntl, &stream_options) != 0) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to