This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 7058b31edd465b53bc4525f4dc4188f1bbdf6f5c Author: Kaijie Chen <c...@apache.org> AuthorDate: Mon May 27 15:29:38 2024 +0800 [fix](move-memtable) clear load streams before shutdown SegmentFileWriterThreadPool (#35217) --- be/src/runtime/load_stream.h | 2 +- be/src/runtime/load_stream_mgr.cpp | 14 ++++++++------ be/src/runtime/load_stream_mgr.h | 5 ++--- be/src/service/internal_service.cpp | 4 ++-- be/test/runtime/load_stream_test.cpp | 4 ++-- 5 files changed, 15 insertions(+), 14 deletions(-) diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h index be1cb7756a1..c61a2d163de 100644 --- a/be/src/runtime/load_stream.h +++ b/be/src/runtime/load_stream.h @@ -169,6 +169,6 @@ private: QueryThreadContext _query_thread_context; }; -using LoadStreamSharedPtr = std::shared_ptr<LoadStream>; +using LoadStreamPtr = std::unique_ptr<LoadStream>; } // namespace doris diff --git a/be/src/runtime/load_stream_mgr.cpp b/be/src/runtime/load_stream_mgr.cpp index 8d9d37c5d3a..c99f69f5607 100644 --- a/be/src/runtime/load_stream_mgr.cpp +++ b/be/src/runtime/load_stream_mgr.cpp @@ -44,23 +44,25 @@ LoadStreamMgr::LoadStreamMgr(uint32_t segment_file_writer_thread_num, } LoadStreamMgr::~LoadStreamMgr() { + _load_streams_map.clear(); _file_writer_thread_pool->shutdown(); } Status LoadStreamMgr::open_load_stream(const POpenLoadStreamRequest* request, - LoadStreamSharedPtr& load_stream) { + LoadStream*& load_stream) { UniqueId load_id(request->load_id()); { std::lock_guard<decltype(_lock)> l(_lock); auto it = _load_streams_map.find(load_id); if (it != _load_streams_map.end()) { - load_stream = it->second; + load_stream = it->second.get(); } else { - load_stream = std::make_shared<LoadStream>(request->load_id(), this, - request->enable_profile()); - RETURN_IF_ERROR(load_stream->init(request)); - _load_streams_map[load_id] = load_stream; + auto p = std::make_unique<LoadStream>(request->load_id(), this, + request->enable_profile()); + RETURN_IF_ERROR(p->init(request)); + load_stream = p.get(); + _load_streams_map[load_id] = std::move(p); } load_stream->add_source(request->src_id()); } diff --git a/be/src/runtime/load_stream_mgr.h b/be/src/runtime/load_stream_mgr.h index ff742012774..9e875f3b829 100644 --- a/be/src/runtime/load_stream_mgr.h +++ b/be/src/runtime/load_stream_mgr.h @@ -38,8 +38,7 @@ public: FifoThreadPool* light_work_pool); ~LoadStreamMgr(); - Status open_load_stream(const POpenLoadStreamRequest* request, - LoadStreamSharedPtr& load_stream); + Status open_load_stream(const POpenLoadStreamRequest* request, LoadStream*& load_stream); void clear_load(UniqueId loadid); void create_tokens(std::vector<std::unique_ptr<ThreadPoolToken>>& tokens) { for (int i = 0; i < _num_threads * 2; i++) { @@ -56,7 +55,7 @@ public: private: std::mutex _lock; - std::unordered_map<UniqueId, LoadStreamSharedPtr> _load_streams_map; + std::unordered_map<UniqueId, LoadStreamPtr> _load_streams_map; std::unique_ptr<ThreadPool> _file_writer_thread_pool; uint32_t _num_threads = 0; diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 6abc972634a..6d2f76e4c23 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -388,14 +388,14 @@ void PInternalServiceImpl::open_load_stream(google::protobuf::RpcController* con tablet->tablet_schema()->to_schema_pb(resp->mutable_tablet_schema()); } - LoadStreamSharedPtr load_stream; + LoadStream* load_stream = nullptr; auto st = _load_stream_mgr->open_load_stream(request, load_stream); if (!st.ok()) { st.to_protobuf(response->mutable_status()); return; } - stream_options.handler = load_stream.get(); + stream_options.handler = load_stream; stream_options.idle_timeout_ms = request->idle_timeout_ms(); DBUG_EXECUTE_IF("PInternalServiceImpl.open_load_stream.set_idle_timeout", { stream_options.idle_timeout_ms = 1; }); diff --git a/be/test/runtime/load_stream_test.cpp b/be/test/runtime/load_stream_test.cpp index d248e1019f4..f1e06aa1776 100644 --- a/be/test/runtime/load_stream_test.cpp +++ b/be/test/runtime/load_stream_test.cpp @@ -375,12 +375,12 @@ public: tablet->tablet_schema()->to_schema_pb(resp->mutable_tablet_schema()); } - LoadStreamSharedPtr load_stream; + LoadStream* load_stream; LOG(INFO) << "total streams: " << request->total_streams(); EXPECT_GT(request->total_streams(), 0); auto st = _load_stream_mgr->open_load_stream(request, load_stream); - stream_options.handler = load_stream.get(); + stream_options.handler = load_stream; StreamId streamid; if (brpc::StreamAccept(&streamid, *cntl, &stream_options) != 0) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org