This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ad9eab7c85 [refactor](execenv) remove shared ptr from exec env 
(#46034)
3ad9eab7c85 is described below

commit 3ad9eab7c853b19fab288b6cdfd50a2e59e29ba1
Author: yiguolei <guo...@selectdb.com>
AuthorDate: Sun Dec 29 22:04:41 2024 +0800

    [refactor](execenv) remove shared ptr from exec env (#46034)
    
    ### What problem does this PR solve?
    
    ExecEnv should be the last object to deconstructed, so that it should
    not own any shared ptr. If it own any shared ptr, then we could not make
    sure the deconstruct sequence.
---
 be/src/cloud/cloud_stream_load_executor.h          |  2 ++
 be/src/runtime/exec_env.cpp                        |  6 ----
 be/src/runtime/exec_env.h                          | 30 +++++++----------
 be/src/runtime/exec_env_init.cpp                   | 39 ++++++++++++++++++----
 be/src/runtime/memory/lru_cache_policy.h           | 12 +++----
 be/test/http/stream_load_test.cpp                  |  4 +--
 be/test/olap/wal/wal_manager_test.cpp              | 13 +++++---
 .../runtime/routine_load_task_executor_test.cpp    | 19 +++++++----
 be/test/testutil/run_all_tests.cpp                 |  1 -
 be/test/vec/exec/vwal_scanner_test.cpp             |  3 +-
 10 files changed, 76 insertions(+), 53 deletions(-)

diff --git a/be/src/cloud/cloud_stream_load_executor.h 
b/be/src/cloud/cloud_stream_load_executor.h
index b0cb91d06ac..d04e55feba5 100644
--- a/be/src/cloud/cloud_stream_load_executor.h
+++ b/be/src/cloud/cloud_stream_load_executor.h
@@ -21,6 +21,8 @@
 namespace doris {
 
 class CloudStreamLoadExecutor final : public StreamLoadExecutor {
+    ENABLE_FACTORY_CREATOR(CloudStreamLoadExecutor);
+
 public:
     CloudStreamLoadExecutor(ExecEnv* exec_env);
 
diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp
index ab24d7ca192..e3a71261b67 100644
--- a/be/src/runtime/exec_env.cpp
+++ b/be/src/runtime/exec_env.cpp
@@ -38,12 +38,6 @@
 
 namespace doris {
 
-ExecEnv::ExecEnv() = default;
-
-ExecEnv::~ExecEnv() {
-    destroy();
-}
-
 #ifdef BE_TEST
 void ExecEnv::set_inverted_index_searcher_cache(
         segment_v2::InvertedIndexSearcherCache* inverted_index_searcher_cache) 
{
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 636ce2bf288..0c9a4158ebc 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -244,14 +244,14 @@ public:
     }
     LoadChannelMgr* load_channel_mgr() { return _load_channel_mgr; }
     LoadStreamMgr* load_stream_mgr() { return _load_stream_mgr.get(); }
-    std::shared_ptr<NewLoadStreamMgr> new_load_stream_mgr() { return 
_new_load_stream_mgr; }
+    NewLoadStreamMgr* new_load_stream_mgr() { return 
_new_load_stream_mgr.get(); }
     SmallFileMgr* small_file_mgr() { return _small_file_mgr; }
     doris::vectorized::SpillStreamManager* spill_stream_mgr() { return 
_spill_stream_mgr; }
     GroupCommitMgr* group_commit_mgr() { return _group_commit_mgr; }
 
     const std::vector<StorePath>& store_paths() const { return _store_paths; }
 
-    std::shared_ptr<StreamLoadExecutor> stream_load_executor() { return 
_stream_load_executor; }
+    StreamLoadExecutor* stream_load_executor() { return 
_stream_load_executor.get(); }
     RoutineLoadTaskExecutor* routine_load_task_executor() { return 
_routine_load_task_executor; }
     HeartbeatFlags* heartbeat_flags() { return _heartbeat_flags; }
     vectorized::ScannerScheduler* scanner_scheduler() { return 
_scanner_scheduler; }
@@ -273,12 +273,10 @@ public:
         _memtable_memory_limiter.reset(limiter);
     }
     void set_cluster_info(ClusterInfo* cluster_info) { this->_cluster_info = 
cluster_info; }
-    void set_new_load_stream_mgr(std::shared_ptr<NewLoadStreamMgr> 
new_load_stream_mgr) {
-        this->_new_load_stream_mgr = new_load_stream_mgr;
-    }
-    void set_stream_load_executor(std::shared_ptr<StreamLoadExecutor> 
stream_load_executor) {
-        this->_stream_load_executor = stream_load_executor;
-    }
+    void set_new_load_stream_mgr(std::unique_ptr<NewLoadStreamMgr>&& 
new_load_stream_mgr);
+    void clear_new_load_stream_mgr();
+    void set_stream_load_executor(std::unique_ptr<StreamLoadExecutor>&& 
stream_load_executor);
+    void clear_stream_load_executor();
 
     void set_storage_engine(std::unique_ptr<BaseStorageEngine>&& engine);
     void set_inverted_index_searcher_cache(
@@ -294,10 +292,9 @@ public:
     void set_routine_load_task_executor(RoutineLoadTaskExecutor* r) {
         this->_routine_load_task_executor = r;
     }
-    void set_wal_mgr(std::shared_ptr<WalManager> wm) { this->_wal_manager = 
wm; }
-    void set_dummy_lru_cache(std::shared_ptr<DummyLRUCache> dummy_lru_cache) {
-        this->_dummy_lru_cache = dummy_lru_cache;
-    }
+    void set_wal_mgr(std::unique_ptr<WalManager>&& wm);
+    void clear_wal_mgr();
+
     void set_write_cooldown_meta_executors();
     static void set_tracking_memory(bool tracking_memory) {
         _s_tracking_memory.store(tracking_memory, std::memory_order_release);
@@ -331,7 +328,6 @@ public:
         return _inverted_index_query_cache;
     }
     QueryCache* get_query_cache() { return _query_cache; }
-    std::shared_ptr<DummyLRUCache> get_dummy_lru_cache() { return 
_dummy_lru_cache; }
 
     pipeline::RuntimeFilterTimerQueue* runtime_filter_timer_queue() {
         return _runtime_filter_timer_queue;
@@ -429,13 +425,12 @@ private:
     BrokerMgr* _broker_mgr = nullptr;
     LoadChannelMgr* _load_channel_mgr = nullptr;
     std::unique_ptr<LoadStreamMgr> _load_stream_mgr;
-    // TODO(zhiqiang): Do not use shared_ptr in exec_env, we can not control 
its life cycle.
-    std::shared_ptr<NewLoadStreamMgr> _new_load_stream_mgr;
+    std::unique_ptr<NewLoadStreamMgr> _new_load_stream_mgr;
     BrpcClientCache<PBackendService_Stub>* _internal_client_cache = nullptr;
     BrpcClientCache<PBackendService_Stub>* _streaming_client_cache = nullptr;
     BrpcClientCache<PFunctionService_Stub>* _function_client_cache = nullptr;
 
-    std::shared_ptr<StreamLoadExecutor> _stream_load_executor;
+    std::unique_ptr<StreamLoadExecutor> _stream_load_executor;
     RoutineLoadTaskExecutor* _routine_load_task_executor = nullptr;
     SmallFileMgr* _small_file_mgr = nullptr;
     HeartbeatFlags* _heartbeat_flags = nullptr;
@@ -446,7 +441,7 @@ private:
     std::unique_ptr<MemTableMemoryLimiter> _memtable_memory_limiter;
     std::unique_ptr<LoadStreamMapPool> _load_stream_map_pool;
     std::unique_ptr<vectorized::DeltaWriterV2Pool> _delta_writer_v2_pool;
-    std::shared_ptr<WalManager> _wal_manager;
+    std::unique_ptr<WalManager> _wal_manager;
     DNSCache* _dns_cache = nullptr;
     std::unique_ptr<WriteCooldownMetaExecutors> _write_cooldown_meta_executors;
 
@@ -473,7 +468,6 @@ private:
     segment_v2::InvertedIndexSearcherCache* _inverted_index_searcher_cache = 
nullptr;
     segment_v2::InvertedIndexQueryCache* _inverted_index_query_cache = nullptr;
     QueryCache* _query_cache = nullptr;
-    std::shared_ptr<DummyLRUCache> _dummy_lru_cache = nullptr;
     std::unique_ptr<io::FDCache> _file_cache_open_fd_cache;
 
     pipeline::RuntimeFilterTimerQueue* _runtime_filter_timer_queue = nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 2d7554e7029..df66315ff05 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -185,6 +185,12 @@ ThreadPool* ExecEnv::non_block_close_thread_pool() {
 #endif
 }
 
+ExecEnv::ExecEnv() = default;
+
+ExecEnv::~ExecEnv() {
+    destroy();
+}
+
 Status ExecEnv::init(ExecEnv* env, const std::vector<StorePath>& store_paths,
                      const std::vector<StorePath>& spill_store_paths,
                      const std::set<std::string>& broken_paths) {
@@ -290,16 +296,16 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths,
             _store_paths.size() * config::flush_thread_num_per_store,
             static_cast<size_t>(CpuInfo::num_cores()) * 
config::max_flush_thread_num_per_cpu);
     _load_stream_mgr = std::make_unique<LoadStreamMgr>(num_flush_threads);
-    _new_load_stream_mgr = NewLoadStreamMgr::create_shared();
+    _new_load_stream_mgr = NewLoadStreamMgr::create_unique();
     _internal_client_cache = new BrpcClientCache<PBackendService_Stub>();
     _streaming_client_cache =
             new BrpcClientCache<PBackendService_Stub>("baidu_std", "single", 
"streaming");
     _function_client_cache =
             new 
BrpcClientCache<PFunctionService_Stub>(config::function_service_protocol);
     if (config::is_cloud_mode()) {
-        _stream_load_executor = 
std::make_shared<CloudStreamLoadExecutor>(this);
+        _stream_load_executor = CloudStreamLoadExecutor::create_unique(this);
     } else {
-        _stream_load_executor = StreamLoadExecutor::create_shared(this);
+        _stream_load_executor = StreamLoadExecutor::create_unique(this);
     }
     _routine_load_task_executor = new RoutineLoadTaskExecutor(this);
     RETURN_IF_ERROR(_routine_load_task_executor->init(MemInfo::mem_limit()));
@@ -309,7 +315,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths,
     _load_stream_map_pool = std::make_unique<LoadStreamMapPool>();
     _delta_writer_v2_pool = std::make_unique<vectorized::DeltaWriterV2Pool>();
     _file_cache_open_fd_cache = std::make_unique<io::FDCache>();
-    _wal_manager = WalManager::create_shared(this, 
config::group_commit_wal_path);
+    _wal_manager = WalManager::create_unique(this, 
config::group_commit_wal_path);
     _dns_cache = new DNSCache();
     _write_cooldown_meta_executors = 
std::make_unique<WriteCooldownMetaExecutors>();
     _spill_stream_mgr = new 
vectorized::SpillStreamManager(std::move(spill_store_map));
@@ -464,8 +470,6 @@ Status ExecEnv::_init_mem_env() {
         return Status::InternalError(ss.str());
     }
 
-    _dummy_lru_cache = std::make_shared<DummyLRUCache>();
-
     _cache_manager = CacheManager::create_global_instance();
 
     int64_t storage_cache_limit =
@@ -681,7 +685,30 @@ void ExecEnv::_deregister_metrics() {
     DEREGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num);
     DEREGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size);
 }
+#ifdef BE_TEST
+void ExecEnv::set_new_load_stream_mgr(std::unique_ptr<NewLoadStreamMgr>&& 
new_load_stream_mgr) {
+    this->_new_load_stream_mgr = std::move(new_load_stream_mgr);
+}
+
+void ExecEnv::clear_new_load_stream_mgr() {
+    this->_new_load_stream_mgr.reset();
+}
 
+void ExecEnv::set_stream_load_executor(std::unique_ptr<StreamLoadExecutor>&& 
stream_load_executor) {
+    this->_stream_load_executor = std::move(stream_load_executor);
+}
+
+void ExecEnv::clear_stream_load_executor() {
+    this->_stream_load_executor.reset();
+}
+
+void ExecEnv::set_wal_mgr(std::unique_ptr<WalManager>&& wm) {
+    this->_wal_manager = std::move(wm);
+}
+void ExecEnv::clear_wal_mgr() {
+    this->_wal_manager.reset();
+}
+#endif
 // TODO(zhiqiang): Need refactor all thread pool. Each thread pool must have a 
Stop method.
 // We need to stop all threads before releasing resource.
 void ExecEnv::destroy() {
diff --git a/be/src/runtime/memory/lru_cache_policy.h 
b/be/src/runtime/memory/lru_cache_policy.h
index 7e73f2dd76b..7e02247efb8 100644
--- a/be/src/runtime/memory/lru_cache_policy.h
+++ b/be/src/runtime/memory/lru_cache_policy.h
@@ -44,8 +44,7 @@ public:
                     new ShardedLRUCache(type_string(type), capacity, 
lru_cache_type, num_shards,
                                         element_count_capacity, is_lru_k));
         } else {
-            CHECK(ExecEnv::GetInstance()->get_dummy_lru_cache());
-            _cache = ExecEnv::GetInstance()->get_dummy_lru_cache();
+            _cache = std::make_shared<doris::DummyLRUCache>();
         }
         _init_mem_tracker(lru_cache_type_string(lru_cache_type));
     }
@@ -64,8 +63,7 @@ public:
                                         cache_value_time_extractor, 
cache_value_check_timestamp,
                                         element_count_capacity, is_lru_k));
         } else {
-            CHECK(ExecEnv::GetInstance()->get_dummy_lru_cache());
-            _cache = ExecEnv::GetInstance()->get_dummy_lru_cache();
+            _cache = std::make_shared<doris::DummyLRUCache>();
         }
         _init_mem_tracker(lru_cache_type_string(lru_cache_type));
     }
@@ -157,7 +155,7 @@ public:
         std::lock_guard<std::mutex> l(_lock);
         COUNTER_SET(_freed_entrys_counter, (int64_t)0);
         COUNTER_SET(_freed_memory_counter, (int64_t)0);
-        if (_stale_sweep_time_s <= 0 || _cache == 
ExecEnv::GetInstance()->get_dummy_lru_cache()) {
+        if (_stale_sweep_time_s <= 0 || 
std::dynamic_pointer_cast<doris::DummyLRUCache>(_cache)) {
             return;
         }
         if (exceed_prune_limit()) {
@@ -204,7 +202,7 @@ public:
         std::lock_guard<std::mutex> l(_lock);
         COUNTER_SET(_freed_entrys_counter, (int64_t)0);
         COUNTER_SET(_freed_memory_counter, (int64_t)0);
-        if (_cache == ExecEnv::GetInstance()->get_dummy_lru_cache()) {
+        if (std::dynamic_pointer_cast<doris::DummyLRUCache>(_cache)) {
             return;
         }
         if ((force && mem_consumption() != 0) || exceed_prune_limit()) {
@@ -246,7 +244,7 @@ public:
         COUNTER_SET(_freed_entrys_counter, (int64_t)0);
         COUNTER_SET(_freed_memory_counter, (int64_t)0);
         COUNTER_SET(_cost_timer, (int64_t)0);
-        if (_cache == ExecEnv::GetInstance()->get_dummy_lru_cache()) {
+        if (std::dynamic_pointer_cast<doris::DummyLRUCache>(_cache)) {
             return 0;
         }
 
diff --git a/be/test/http/stream_load_test.cpp 
b/be/test/http/stream_load_test.cpp
index d797c081f41..faa582704d1 100644
--- a/be/test/http/stream_load_test.cpp
+++ b/be/test/http/stream_load_test.cpp
@@ -54,11 +54,11 @@ void http_request_done_cb(struct evhttp_request* req, void* 
arg) {
 
 TEST_F(StreamLoadTest, TestHeader) {
     // 1G
-    auto wal_mgr = WalManager::create_shared(ExecEnv::GetInstance(), 
config::group_commit_wal_path);
+    auto wal_mgr = WalManager::create_unique(ExecEnv::GetInstance(), 
config::group_commit_wal_path);
     static_cast<void>(wal_mgr->_wal_dirs_info->add("test_path_1", 1000, 0, 0));
     static_cast<void>(wal_mgr->_wal_dirs_info->add("test_path_2", 10000, 0, 
0));
     static_cast<void>(wal_mgr->_wal_dirs_info->add("test_path_3", 100000, 0, 
0));
-    ExecEnv::GetInstance()->set_wal_mgr(wal_mgr);
+    ExecEnv::GetInstance()->set_wal_mgr(std::move(wal_mgr));
     // 1. empty info
     {
         auto* evhttp_req = evhttp_request_new(nullptr, nullptr);
diff --git a/be/test/olap/wal/wal_manager_test.cpp 
b/be/test/olap/wal/wal_manager_test.cpp
index 32162593fc0..5a6ce49067b 100644
--- a/be/test/olap/wal/wal_manager_test.cpp
+++ b/be/test/olap/wal/wal_manager_test.cpp
@@ -59,12 +59,12 @@ public:
         _env->_cluster_info->master_fe_addr.hostname = "host name";
         _env->_cluster_info->master_fe_addr.port = 1234;
         _env->_cluster_info->backend_id = 1001;
-        _env->new_load_stream_mgr() = NewLoadStreamMgr::create_shared();
+        _env->set_new_load_stream_mgr(NewLoadStreamMgr::create_unique());
         _env->_internal_client_cache = new 
BrpcClientCache<PBackendService_Stub>();
         _env->_function_client_cache = new 
BrpcClientCache<PFunctionService_Stub>();
-        _env->_stream_load_executor = StreamLoadExecutor::create_shared(_env);
+        _env->_stream_load_executor = StreamLoadExecutor::create_unique(_env);
         _env->_store_paths = {StorePath(std::filesystem::current_path(), 0)};
-        _env->_wal_manager = WalManager::create_shared(_env, wal_dir.string());
+        _env->set_wal_mgr(WalManager::create_unique(_env, wal_dir.string()));
         k_stream_load_begin_result = TLoadTxnBeginResult();
     }
     void TearDown() override {
@@ -78,6 +78,9 @@ public:
         SAFE_DELETE(_env->_function_client_cache);
         SAFE_DELETE(_env->_internal_client_cache);
         SAFE_DELETE(_env->_cluster_info);
+        _env->clear_new_load_stream_mgr();
+        _env->clear_stream_load_executor();
+        //_env->clear_wal_mgr();
     }
 
     void prepare() {
@@ -155,9 +158,9 @@ TEST_F(WalManagerTest, recovery_normal) {
 }
 
 TEST_F(WalManagerTest, TestDynamicWalSpaceLimt) {
-    auto wal_mgr = WalManager::create_shared(_env, 
config::group_commit_wal_path);
+    auto wal_mgr = WalManager::create_unique(_env, 
config::group_commit_wal_path);
     static_cast<void>(wal_mgr->init());
-    _env->set_wal_mgr(wal_mgr);
+    _env->set_wal_mgr(std::move(wal_mgr));
 
     // 1T
     size_t available_bytes = 1099511627776;
diff --git a/be/test/runtime/routine_load_task_executor_test.cpp 
b/be/test/runtime/routine_load_task_executor_test.cpp
index 5c2b39bce1f..080d6ff4bc5 100644
--- a/be/test/runtime/routine_load_task_executor_test.cpp
+++ b/be/test/runtime/routine_load_task_executor_test.cpp
@@ -49,23 +49,28 @@ public:
     RoutineLoadTaskExecutorTest() = default;
     ~RoutineLoadTaskExecutorTest() override = default;
 
+    ExecEnv* _env = nullptr;
+
     void SetUp() override {
+        _env = ExecEnv::GetInstance();
         k_stream_load_begin_result = TLoadTxnBeginResult();
         k_stream_load_commit_result = TLoadTxnCommitResult();
         k_stream_load_rollback_result = TLoadTxnRollbackResult();
         k_stream_load_put_result = TStreamLoadPutResult();
 
-        _env.set_cluster_info(new ClusterInfo());
-        _env.set_new_load_stream_mgr(NewLoadStreamMgr::create_unique());
-        
_env.set_stream_load_executor(StreamLoadExecutor::create_unique(&_env));
+        _env->set_cluster_info(new ClusterInfo());
+        _env->set_new_load_stream_mgr(NewLoadStreamMgr::create_unique());
+        
_env->set_stream_load_executor(StreamLoadExecutor::create_unique(_env));
 
         config::max_routine_load_thread_pool_size = 1024;
         config::max_consumer_num_per_group = 3;
     }
 
-    void TearDown() override { delete _env.cluster_info(); }
-
-    ExecEnv _env;
+    void TearDown() override {
+        delete _env->cluster_info();
+        _env->clear_new_load_stream_mgr();
+        _env->clear_stream_load_executor();
+    }
 };
 
 TEST_F(RoutineLoadTaskExecutorTest, exec_task) {
@@ -92,7 +97,7 @@ TEST_F(RoutineLoadTaskExecutorTest, exec_task) {
 
     task.__set_kafka_load_info(k_info);
 
-    RoutineLoadTaskExecutor executor(&_env);
+    RoutineLoadTaskExecutor executor(_env);
     Status st;
     st = executor.init(1024 * 1024);
     EXPECT_TRUE(st.ok());
diff --git a/be/test/testutil/run_all_tests.cpp 
b/be/test/testutil/run_all_tests.cpp
index 59933db80e5..1208141a8fa 100644
--- a/be/test/testutil/run_all_tests.cpp
+++ b/be/test/testutil/run_all_tests.cpp
@@ -58,7 +58,6 @@ int main(int argc, char** argv) {
     
doris::ExecEnv::GetInstance()->set_cache_manager(doris::CacheManager::create_global_instance());
     doris::ExecEnv::GetInstance()->set_process_profile(
             doris::ProcessProfile::create_global_instance());
-    
doris::ExecEnv::GetInstance()->set_dummy_lru_cache(std::make_shared<doris::DummyLRUCache>());
     doris::ExecEnv::GetInstance()->set_storage_page_cache(
             doris::StoragePageCache::create_global_cache(1 << 30, 10, 0));
     doris::ExecEnv::GetInstance()->set_segment_loader(new 
doris::SegmentLoader(1000, 1000));
diff --git a/be/test/vec/exec/vwal_scanner_test.cpp 
b/be/test/vec/exec/vwal_scanner_test.cpp
index 5c4056a8c24..2e6d4bf5cde 100644
--- a/be/test/vec/exec/vwal_scanner_test.cpp
+++ b/be/test/vec/exec/vwal_scanner_test.cpp
@@ -79,6 +79,7 @@ public:
         
WARN_IF_ERROR(io::global_local_filesystem()->delete_directory(_wal_dir),
                       fmt::format("fail to delete dir={}", _wal_dir));
         SAFE_STOP(_env->_wal_manager);
+        _env->clear_wal_mgr();
     }
 
 protected:
@@ -286,7 +287,7 @@ void VWalScannerTest::init() {
     _env->_cluster_info->master_fe_addr.hostname = "host name";
     _env->_cluster_info->master_fe_addr.port = _backend_id;
     _env->_cluster_info->backend_id = 1001;
-    _env->_wal_manager = WalManager::create_shared(_env, _wal_dir);
+    _env->set_wal_mgr(WalManager::create_unique(_env, _wal_dir));
     std::string base_path;
     auto st = _env->_wal_manager->_init_wal_dirs_info();
     st = _env->_wal_manager->create_wal_path(_db_id, _tb_id, _txn_id_1, 
_label_1, base_path,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to