This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 3ad9eab7c85 [refactor](execenv) remove shared ptr from exec env (#46034) 3ad9eab7c85 is described below commit 3ad9eab7c853b19fab288b6cdfd50a2e59e29ba1 Author: yiguolei <guo...@selectdb.com> AuthorDate: Sun Dec 29 22:04:41 2024 +0800 [refactor](execenv) remove shared ptr from exec env (#46034) ### What problem does this PR solve? ExecEnv should be the last object to deconstructed, so that it should not own any shared ptr. If it own any shared ptr, then we could not make sure the deconstruct sequence. --- be/src/cloud/cloud_stream_load_executor.h | 2 ++ be/src/runtime/exec_env.cpp | 6 ---- be/src/runtime/exec_env.h | 30 +++++++---------- be/src/runtime/exec_env_init.cpp | 39 ++++++++++++++++++---- be/src/runtime/memory/lru_cache_policy.h | 12 +++---- be/test/http/stream_load_test.cpp | 4 +-- be/test/olap/wal/wal_manager_test.cpp | 13 +++++--- .../runtime/routine_load_task_executor_test.cpp | 19 +++++++---- be/test/testutil/run_all_tests.cpp | 1 - be/test/vec/exec/vwal_scanner_test.cpp | 3 +- 10 files changed, 76 insertions(+), 53 deletions(-) diff --git a/be/src/cloud/cloud_stream_load_executor.h b/be/src/cloud/cloud_stream_load_executor.h index b0cb91d06ac..d04e55feba5 100644 --- a/be/src/cloud/cloud_stream_load_executor.h +++ b/be/src/cloud/cloud_stream_load_executor.h @@ -21,6 +21,8 @@ namespace doris { class CloudStreamLoadExecutor final : public StreamLoadExecutor { + ENABLE_FACTORY_CREATOR(CloudStreamLoadExecutor); + public: CloudStreamLoadExecutor(ExecEnv* exec_env); diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp index ab24d7ca192..e3a71261b67 100644 --- a/be/src/runtime/exec_env.cpp +++ b/be/src/runtime/exec_env.cpp @@ -38,12 +38,6 @@ namespace doris { -ExecEnv::ExecEnv() = default; - -ExecEnv::~ExecEnv() { - destroy(); -} - #ifdef BE_TEST void ExecEnv::set_inverted_index_searcher_cache( segment_v2::InvertedIndexSearcherCache* inverted_index_searcher_cache) { diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 636ce2bf288..0c9a4158ebc 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -244,14 +244,14 @@ public: } LoadChannelMgr* load_channel_mgr() { return _load_channel_mgr; } LoadStreamMgr* load_stream_mgr() { return _load_stream_mgr.get(); } - std::shared_ptr<NewLoadStreamMgr> new_load_stream_mgr() { return _new_load_stream_mgr; } + NewLoadStreamMgr* new_load_stream_mgr() { return _new_load_stream_mgr.get(); } SmallFileMgr* small_file_mgr() { return _small_file_mgr; } doris::vectorized::SpillStreamManager* spill_stream_mgr() { return _spill_stream_mgr; } GroupCommitMgr* group_commit_mgr() { return _group_commit_mgr; } const std::vector<StorePath>& store_paths() const { return _store_paths; } - std::shared_ptr<StreamLoadExecutor> stream_load_executor() { return _stream_load_executor; } + StreamLoadExecutor* stream_load_executor() { return _stream_load_executor.get(); } RoutineLoadTaskExecutor* routine_load_task_executor() { return _routine_load_task_executor; } HeartbeatFlags* heartbeat_flags() { return _heartbeat_flags; } vectorized::ScannerScheduler* scanner_scheduler() { return _scanner_scheduler; } @@ -273,12 +273,10 @@ public: _memtable_memory_limiter.reset(limiter); } void set_cluster_info(ClusterInfo* cluster_info) { this->_cluster_info = cluster_info; } - void set_new_load_stream_mgr(std::shared_ptr<NewLoadStreamMgr> new_load_stream_mgr) { - this->_new_load_stream_mgr = new_load_stream_mgr; - } - void set_stream_load_executor(std::shared_ptr<StreamLoadExecutor> stream_load_executor) { - this->_stream_load_executor = stream_load_executor; - } + void set_new_load_stream_mgr(std::unique_ptr<NewLoadStreamMgr>&& new_load_stream_mgr); + void clear_new_load_stream_mgr(); + void set_stream_load_executor(std::unique_ptr<StreamLoadExecutor>&& stream_load_executor); + void clear_stream_load_executor(); void set_storage_engine(std::unique_ptr<BaseStorageEngine>&& engine); void set_inverted_index_searcher_cache( @@ -294,10 +292,9 @@ public: void set_routine_load_task_executor(RoutineLoadTaskExecutor* r) { this->_routine_load_task_executor = r; } - void set_wal_mgr(std::shared_ptr<WalManager> wm) { this->_wal_manager = wm; } - void set_dummy_lru_cache(std::shared_ptr<DummyLRUCache> dummy_lru_cache) { - this->_dummy_lru_cache = dummy_lru_cache; - } + void set_wal_mgr(std::unique_ptr<WalManager>&& wm); + void clear_wal_mgr(); + void set_write_cooldown_meta_executors(); static void set_tracking_memory(bool tracking_memory) { _s_tracking_memory.store(tracking_memory, std::memory_order_release); @@ -331,7 +328,6 @@ public: return _inverted_index_query_cache; } QueryCache* get_query_cache() { return _query_cache; } - std::shared_ptr<DummyLRUCache> get_dummy_lru_cache() { return _dummy_lru_cache; } pipeline::RuntimeFilterTimerQueue* runtime_filter_timer_queue() { return _runtime_filter_timer_queue; @@ -429,13 +425,12 @@ private: BrokerMgr* _broker_mgr = nullptr; LoadChannelMgr* _load_channel_mgr = nullptr; std::unique_ptr<LoadStreamMgr> _load_stream_mgr; - // TODO(zhiqiang): Do not use shared_ptr in exec_env, we can not control its life cycle. - std::shared_ptr<NewLoadStreamMgr> _new_load_stream_mgr; + std::unique_ptr<NewLoadStreamMgr> _new_load_stream_mgr; BrpcClientCache<PBackendService_Stub>* _internal_client_cache = nullptr; BrpcClientCache<PBackendService_Stub>* _streaming_client_cache = nullptr; BrpcClientCache<PFunctionService_Stub>* _function_client_cache = nullptr; - std::shared_ptr<StreamLoadExecutor> _stream_load_executor; + std::unique_ptr<StreamLoadExecutor> _stream_load_executor; RoutineLoadTaskExecutor* _routine_load_task_executor = nullptr; SmallFileMgr* _small_file_mgr = nullptr; HeartbeatFlags* _heartbeat_flags = nullptr; @@ -446,7 +441,7 @@ private: std::unique_ptr<MemTableMemoryLimiter> _memtable_memory_limiter; std::unique_ptr<LoadStreamMapPool> _load_stream_map_pool; std::unique_ptr<vectorized::DeltaWriterV2Pool> _delta_writer_v2_pool; - std::shared_ptr<WalManager> _wal_manager; + std::unique_ptr<WalManager> _wal_manager; DNSCache* _dns_cache = nullptr; std::unique_ptr<WriteCooldownMetaExecutors> _write_cooldown_meta_executors; @@ -473,7 +468,6 @@ private: segment_v2::InvertedIndexSearcherCache* _inverted_index_searcher_cache = nullptr; segment_v2::InvertedIndexQueryCache* _inverted_index_query_cache = nullptr; QueryCache* _query_cache = nullptr; - std::shared_ptr<DummyLRUCache> _dummy_lru_cache = nullptr; std::unique_ptr<io::FDCache> _file_cache_open_fd_cache; pipeline::RuntimeFilterTimerQueue* _runtime_filter_timer_queue = nullptr; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 2d7554e7029..df66315ff05 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -185,6 +185,12 @@ ThreadPool* ExecEnv::non_block_close_thread_pool() { #endif } +ExecEnv::ExecEnv() = default; + +ExecEnv::~ExecEnv() { + destroy(); +} + Status ExecEnv::init(ExecEnv* env, const std::vector<StorePath>& store_paths, const std::vector<StorePath>& spill_store_paths, const std::set<std::string>& broken_paths) { @@ -290,16 +296,16 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths, _store_paths.size() * config::flush_thread_num_per_store, static_cast<size_t>(CpuInfo::num_cores()) * config::max_flush_thread_num_per_cpu); _load_stream_mgr = std::make_unique<LoadStreamMgr>(num_flush_threads); - _new_load_stream_mgr = NewLoadStreamMgr::create_shared(); + _new_load_stream_mgr = NewLoadStreamMgr::create_unique(); _internal_client_cache = new BrpcClientCache<PBackendService_Stub>(); _streaming_client_cache = new BrpcClientCache<PBackendService_Stub>("baidu_std", "single", "streaming"); _function_client_cache = new BrpcClientCache<PFunctionService_Stub>(config::function_service_protocol); if (config::is_cloud_mode()) { - _stream_load_executor = std::make_shared<CloudStreamLoadExecutor>(this); + _stream_load_executor = CloudStreamLoadExecutor::create_unique(this); } else { - _stream_load_executor = StreamLoadExecutor::create_shared(this); + _stream_load_executor = StreamLoadExecutor::create_unique(this); } _routine_load_task_executor = new RoutineLoadTaskExecutor(this); RETURN_IF_ERROR(_routine_load_task_executor->init(MemInfo::mem_limit())); @@ -309,7 +315,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths, _load_stream_map_pool = std::make_unique<LoadStreamMapPool>(); _delta_writer_v2_pool = std::make_unique<vectorized::DeltaWriterV2Pool>(); _file_cache_open_fd_cache = std::make_unique<io::FDCache>(); - _wal_manager = WalManager::create_shared(this, config::group_commit_wal_path); + _wal_manager = WalManager::create_unique(this, config::group_commit_wal_path); _dns_cache = new DNSCache(); _write_cooldown_meta_executors = std::make_unique<WriteCooldownMetaExecutors>(); _spill_stream_mgr = new vectorized::SpillStreamManager(std::move(spill_store_map)); @@ -464,8 +470,6 @@ Status ExecEnv::_init_mem_env() { return Status::InternalError(ss.str()); } - _dummy_lru_cache = std::make_shared<DummyLRUCache>(); - _cache_manager = CacheManager::create_global_instance(); int64_t storage_cache_limit = @@ -681,7 +685,30 @@ void ExecEnv::_deregister_metrics() { DEREGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num); DEREGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size); } +#ifdef BE_TEST +void ExecEnv::set_new_load_stream_mgr(std::unique_ptr<NewLoadStreamMgr>&& new_load_stream_mgr) { + this->_new_load_stream_mgr = std::move(new_load_stream_mgr); +} + +void ExecEnv::clear_new_load_stream_mgr() { + this->_new_load_stream_mgr.reset(); +} +void ExecEnv::set_stream_load_executor(std::unique_ptr<StreamLoadExecutor>&& stream_load_executor) { + this->_stream_load_executor = std::move(stream_load_executor); +} + +void ExecEnv::clear_stream_load_executor() { + this->_stream_load_executor.reset(); +} + +void ExecEnv::set_wal_mgr(std::unique_ptr<WalManager>&& wm) { + this->_wal_manager = std::move(wm); +} +void ExecEnv::clear_wal_mgr() { + this->_wal_manager.reset(); +} +#endif // TODO(zhiqiang): Need refactor all thread pool. Each thread pool must have a Stop method. // We need to stop all threads before releasing resource. void ExecEnv::destroy() { diff --git a/be/src/runtime/memory/lru_cache_policy.h b/be/src/runtime/memory/lru_cache_policy.h index 7e73f2dd76b..7e02247efb8 100644 --- a/be/src/runtime/memory/lru_cache_policy.h +++ b/be/src/runtime/memory/lru_cache_policy.h @@ -44,8 +44,7 @@ public: new ShardedLRUCache(type_string(type), capacity, lru_cache_type, num_shards, element_count_capacity, is_lru_k)); } else { - CHECK(ExecEnv::GetInstance()->get_dummy_lru_cache()); - _cache = ExecEnv::GetInstance()->get_dummy_lru_cache(); + _cache = std::make_shared<doris::DummyLRUCache>(); } _init_mem_tracker(lru_cache_type_string(lru_cache_type)); } @@ -64,8 +63,7 @@ public: cache_value_time_extractor, cache_value_check_timestamp, element_count_capacity, is_lru_k)); } else { - CHECK(ExecEnv::GetInstance()->get_dummy_lru_cache()); - _cache = ExecEnv::GetInstance()->get_dummy_lru_cache(); + _cache = std::make_shared<doris::DummyLRUCache>(); } _init_mem_tracker(lru_cache_type_string(lru_cache_type)); } @@ -157,7 +155,7 @@ public: std::lock_guard<std::mutex> l(_lock); COUNTER_SET(_freed_entrys_counter, (int64_t)0); COUNTER_SET(_freed_memory_counter, (int64_t)0); - if (_stale_sweep_time_s <= 0 || _cache == ExecEnv::GetInstance()->get_dummy_lru_cache()) { + if (_stale_sweep_time_s <= 0 || std::dynamic_pointer_cast<doris::DummyLRUCache>(_cache)) { return; } if (exceed_prune_limit()) { @@ -204,7 +202,7 @@ public: std::lock_guard<std::mutex> l(_lock); COUNTER_SET(_freed_entrys_counter, (int64_t)0); COUNTER_SET(_freed_memory_counter, (int64_t)0); - if (_cache == ExecEnv::GetInstance()->get_dummy_lru_cache()) { + if (std::dynamic_pointer_cast<doris::DummyLRUCache>(_cache)) { return; } if ((force && mem_consumption() != 0) || exceed_prune_limit()) { @@ -246,7 +244,7 @@ public: COUNTER_SET(_freed_entrys_counter, (int64_t)0); COUNTER_SET(_freed_memory_counter, (int64_t)0); COUNTER_SET(_cost_timer, (int64_t)0); - if (_cache == ExecEnv::GetInstance()->get_dummy_lru_cache()) { + if (std::dynamic_pointer_cast<doris::DummyLRUCache>(_cache)) { return 0; } diff --git a/be/test/http/stream_load_test.cpp b/be/test/http/stream_load_test.cpp index d797c081f41..faa582704d1 100644 --- a/be/test/http/stream_load_test.cpp +++ b/be/test/http/stream_load_test.cpp @@ -54,11 +54,11 @@ void http_request_done_cb(struct evhttp_request* req, void* arg) { TEST_F(StreamLoadTest, TestHeader) { // 1G - auto wal_mgr = WalManager::create_shared(ExecEnv::GetInstance(), config::group_commit_wal_path); + auto wal_mgr = WalManager::create_unique(ExecEnv::GetInstance(), config::group_commit_wal_path); static_cast<void>(wal_mgr->_wal_dirs_info->add("test_path_1", 1000, 0, 0)); static_cast<void>(wal_mgr->_wal_dirs_info->add("test_path_2", 10000, 0, 0)); static_cast<void>(wal_mgr->_wal_dirs_info->add("test_path_3", 100000, 0, 0)); - ExecEnv::GetInstance()->set_wal_mgr(wal_mgr); + ExecEnv::GetInstance()->set_wal_mgr(std::move(wal_mgr)); // 1. empty info { auto* evhttp_req = evhttp_request_new(nullptr, nullptr); diff --git a/be/test/olap/wal/wal_manager_test.cpp b/be/test/olap/wal/wal_manager_test.cpp index 32162593fc0..5a6ce49067b 100644 --- a/be/test/olap/wal/wal_manager_test.cpp +++ b/be/test/olap/wal/wal_manager_test.cpp @@ -59,12 +59,12 @@ public: _env->_cluster_info->master_fe_addr.hostname = "host name"; _env->_cluster_info->master_fe_addr.port = 1234; _env->_cluster_info->backend_id = 1001; - _env->new_load_stream_mgr() = NewLoadStreamMgr::create_shared(); + _env->set_new_load_stream_mgr(NewLoadStreamMgr::create_unique()); _env->_internal_client_cache = new BrpcClientCache<PBackendService_Stub>(); _env->_function_client_cache = new BrpcClientCache<PFunctionService_Stub>(); - _env->_stream_load_executor = StreamLoadExecutor::create_shared(_env); + _env->_stream_load_executor = StreamLoadExecutor::create_unique(_env); _env->_store_paths = {StorePath(std::filesystem::current_path(), 0)}; - _env->_wal_manager = WalManager::create_shared(_env, wal_dir.string()); + _env->set_wal_mgr(WalManager::create_unique(_env, wal_dir.string())); k_stream_load_begin_result = TLoadTxnBeginResult(); } void TearDown() override { @@ -78,6 +78,9 @@ public: SAFE_DELETE(_env->_function_client_cache); SAFE_DELETE(_env->_internal_client_cache); SAFE_DELETE(_env->_cluster_info); + _env->clear_new_load_stream_mgr(); + _env->clear_stream_load_executor(); + //_env->clear_wal_mgr(); } void prepare() { @@ -155,9 +158,9 @@ TEST_F(WalManagerTest, recovery_normal) { } TEST_F(WalManagerTest, TestDynamicWalSpaceLimt) { - auto wal_mgr = WalManager::create_shared(_env, config::group_commit_wal_path); + auto wal_mgr = WalManager::create_unique(_env, config::group_commit_wal_path); static_cast<void>(wal_mgr->init()); - _env->set_wal_mgr(wal_mgr); + _env->set_wal_mgr(std::move(wal_mgr)); // 1T size_t available_bytes = 1099511627776; diff --git a/be/test/runtime/routine_load_task_executor_test.cpp b/be/test/runtime/routine_load_task_executor_test.cpp index 5c2b39bce1f..080d6ff4bc5 100644 --- a/be/test/runtime/routine_load_task_executor_test.cpp +++ b/be/test/runtime/routine_load_task_executor_test.cpp @@ -49,23 +49,28 @@ public: RoutineLoadTaskExecutorTest() = default; ~RoutineLoadTaskExecutorTest() override = default; + ExecEnv* _env = nullptr; + void SetUp() override { + _env = ExecEnv::GetInstance(); k_stream_load_begin_result = TLoadTxnBeginResult(); k_stream_load_commit_result = TLoadTxnCommitResult(); k_stream_load_rollback_result = TLoadTxnRollbackResult(); k_stream_load_put_result = TStreamLoadPutResult(); - _env.set_cluster_info(new ClusterInfo()); - _env.set_new_load_stream_mgr(NewLoadStreamMgr::create_unique()); - _env.set_stream_load_executor(StreamLoadExecutor::create_unique(&_env)); + _env->set_cluster_info(new ClusterInfo()); + _env->set_new_load_stream_mgr(NewLoadStreamMgr::create_unique()); + _env->set_stream_load_executor(StreamLoadExecutor::create_unique(_env)); config::max_routine_load_thread_pool_size = 1024; config::max_consumer_num_per_group = 3; } - void TearDown() override { delete _env.cluster_info(); } - - ExecEnv _env; + void TearDown() override { + delete _env->cluster_info(); + _env->clear_new_load_stream_mgr(); + _env->clear_stream_load_executor(); + } }; TEST_F(RoutineLoadTaskExecutorTest, exec_task) { @@ -92,7 +97,7 @@ TEST_F(RoutineLoadTaskExecutorTest, exec_task) { task.__set_kafka_load_info(k_info); - RoutineLoadTaskExecutor executor(&_env); + RoutineLoadTaskExecutor executor(_env); Status st; st = executor.init(1024 * 1024); EXPECT_TRUE(st.ok()); diff --git a/be/test/testutil/run_all_tests.cpp b/be/test/testutil/run_all_tests.cpp index 59933db80e5..1208141a8fa 100644 --- a/be/test/testutil/run_all_tests.cpp +++ b/be/test/testutil/run_all_tests.cpp @@ -58,7 +58,6 @@ int main(int argc, char** argv) { doris::ExecEnv::GetInstance()->set_cache_manager(doris::CacheManager::create_global_instance()); doris::ExecEnv::GetInstance()->set_process_profile( doris::ProcessProfile::create_global_instance()); - doris::ExecEnv::GetInstance()->set_dummy_lru_cache(std::make_shared<doris::DummyLRUCache>()); doris::ExecEnv::GetInstance()->set_storage_page_cache( doris::StoragePageCache::create_global_cache(1 << 30, 10, 0)); doris::ExecEnv::GetInstance()->set_segment_loader(new doris::SegmentLoader(1000, 1000)); diff --git a/be/test/vec/exec/vwal_scanner_test.cpp b/be/test/vec/exec/vwal_scanner_test.cpp index 5c4056a8c24..2e6d4bf5cde 100644 --- a/be/test/vec/exec/vwal_scanner_test.cpp +++ b/be/test/vec/exec/vwal_scanner_test.cpp @@ -79,6 +79,7 @@ public: WARN_IF_ERROR(io::global_local_filesystem()->delete_directory(_wal_dir), fmt::format("fail to delete dir={}", _wal_dir)); SAFE_STOP(_env->_wal_manager); + _env->clear_wal_mgr(); } protected: @@ -286,7 +287,7 @@ void VWalScannerTest::init() { _env->_cluster_info->master_fe_addr.hostname = "host name"; _env->_cluster_info->master_fe_addr.port = _backend_id; _env->_cluster_info->backend_id = 1001; - _env->_wal_manager = WalManager::create_shared(_env, _wal_dir); + _env->set_wal_mgr(WalManager::create_unique(_env, _wal_dir)); std::string base_path; auto st = _env->_wal_manager->_init_wal_dirs_info(); st = _env->_wal_manager->create_wal_path(_db_id, _tb_id, _txn_id_1, _label_1, base_path, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org