This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new c7ae2a7d22 [Refactor & Bugfix](static variables) move some static vairables to exec_env (#24029) c7ae2a7d22 is described below commit c7ae2a7d220429a39c52ded077c0d5b484223f38 Author: zhiqqqq <seuhezhiqi...@163.com> AuthorDate: Wed Sep 13 09:27:03 2023 +0800 [Refactor & Bugfix](static variables) move some static vairables to exec_env (#24029) --- be/src/common/daemon.cpp | 7 +- be/src/common/daemon.h | 2 +- be/src/common/resource_tls.cpp | 69 ------ be/src/common/resource_tls.h | 30 --- be/src/http/action/file_cache_action.cpp | 4 +- be/src/io/cache/block/block_file_cache_factory.cpp | 6 +- be/src/io/cache/block/block_file_cache_factory.h | 2 +- .../io/cache/block/cached_remote_file_reader.cpp | 8 +- be/src/io/fs/s3_file_write_bufferpool.h | 4 +- be/src/olap/olap_define.h | 14 ++ be/src/olap/page_cache.cpp | 18 +- be/src/olap/page_cache.h | 9 +- .../rowset/segment_v2/inverted_index_cache.cpp | 13 +- .../olap/rowset/segment_v2/inverted_index_cache.h | 29 ++- be/src/olap/schema_cache.cpp | 10 +- be/src/olap/schema_cache.h | 10 +- be/src/olap/segment_loader.cpp | 8 +- be/src/olap/segment_loader.h | 9 +- be/src/olap/storage_engine.cpp | 67 +++--- be/src/olap/storage_engine.h | 9 +- be/src/olap/tablet_schema_cache.cpp | 12 +- be/src/olap/tablet_schema_cache.h | 22 +- be/src/pipeline/task_scheduler.cpp | 4 +- be/src/pipeline/task_scheduler.h | 2 +- be/src/runtime/broker_mgr.cpp | 2 +- be/src/runtime/broker_mgr.h | 3 +- be/src/runtime/exec_env.cpp | 5 +- be/src/runtime/exec_env.h | 102 +++++++-- be/src/runtime/exec_env_init.cpp | 240 +++++++++++++++++---- be/src/runtime/external_scan_context_mgr.cpp | 2 +- be/src/runtime/external_scan_context_mgr.h | 4 +- be/src/runtime/fragment_mgr.cpp | 4 +- be/src/runtime/fragment_mgr.h | 2 + be/src/runtime/frontend_info.h | 1 + be/src/runtime/group_commit_mgr.cpp | 9 +- be/src/runtime/group_commit_mgr.h | 2 + be/src/runtime/load_channel_mgr.cpp | 5 +- be/src/runtime/load_channel_mgr.h | 2 + be/src/runtime/load_path_mgr.cpp | 2 +- be/src/runtime/load_path_mgr.h | 3 +- be/src/runtime/memory/cache_manager.h | 12 +- be/src/runtime/result_buffer_mgr.cpp | 2 +- be/src/runtime/result_buffer_mgr.h | 5 +- be/src/runtime/routine_load/data_consumer_pool.h | 4 +- .../routine_load/routine_load_task_executor.cpp | 9 +- .../routine_load/routine_load_task_executor.h | 2 + be/src/runtime/task_group/task_group_manager.cpp | 3 - be/src/runtime/task_group/task_group_manager.h | 4 +- be/src/runtime/user_function_cache.cpp | 4 +- be/src/service/brpc_service.cpp | 12 +- be/src/service/doris_main.cpp | 112 +++------- be/src/service/http_service.cpp | 4 + be/src/service/http_service.h | 2 + be/src/service/point_query_executor.cpp | 22 +- be/src/service/point_query_executor.h | 12 +- be/src/vec/exec/scan/scanner_scheduler.cpp | 18 +- be/src/vec/exec/scan/scanner_scheduler.h | 4 +- be/test/common/resource_tls_test.cpp | 49 ----- be/test/olap/delete_bitmap_calculator_test.cpp | 4 +- be/test/olap/delete_handler_test.cpp | 7 +- be/test/olap/delta_writer_test.cpp | 5 +- .../olap/engine_storage_migration_task_test.cpp | 7 +- be/test/olap/memtable_flush_executor_test.cpp | 6 +- be/test/olap/memtable_memory_limiter_test.cpp | 12 +- be/test/olap/ordered_data_compaction_test.cpp | 5 +- be/test/olap/rowid_conversion_test.cpp | 4 +- be/test/olap/rowset/beta_rowset_test.cpp | 9 +- be/test/olap/rowset/rowset_meta_manager_test.cpp | 3 + be/test/olap/tablet_cooldown_test.cpp | 6 +- be/test/olap/tablet_mgr_test.cpp | 3 + be/test/olap/tablet_test.cpp | 4 +- be/test/olap/txn_manager_test.cpp | 1 + be/test/runtime/external_scan_context_mgr_test.cpp | 8 +- be/test/runtime/load_stream_test.cpp | 8 +- .../runtime/routine_load_task_executor_test.cpp | 5 +- be/test/testutil/run_all_tests.cpp | 15 +- be/test/vec/olap/vertical_compaction_test.cpp | 4 +- 77 files changed, 636 insertions(+), 516 deletions(-) diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index c17bf1367f..43a3445382 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -63,10 +63,6 @@ namespace doris { -Daemon::~Daemon() { - stop(); -} - void Daemon::tcmalloc_gc_thread() { // TODO All cache GC wish to be supported #if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) && \ @@ -389,7 +385,9 @@ void Daemon::start() { } void Daemon::stop() { + LOG(INFO) << "Doris daemon is stopping."; if (_stop_background_threads_latch.count() == 0) { + LOG(INFO) << "Doris daemon stop returned since no bg threads latch."; return; } _stop_background_threads_latch.count_down(); @@ -398,6 +396,7 @@ void Daemon::stop() { t->join(); } } + LOG(INFO) << "Doris daemon stopped after background threads are joined."; } } // namespace doris diff --git a/be/src/common/daemon.h b/be/src/common/daemon.h index c552f55f03..139584ba93 100644 --- a/be/src/common/daemon.h +++ b/be/src/common/daemon.h @@ -28,7 +28,7 @@ namespace doris { class Daemon { public: Daemon() : _stop_background_threads_latch(1) {} - ~Daemon(); + ~Daemon() = default; // Start background threads void start(); diff --git a/be/src/common/resource_tls.cpp b/be/src/common/resource_tls.cpp deleted file mode 100644 index 9b5ddc6815..0000000000 --- a/be/src/common/resource_tls.cpp +++ /dev/null @@ -1,69 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "common/resource_tls.h" - -#include <gen_cpp/Types_types.h> -#include <pthread.h> - -#include <ostream> - -#include "common/logging.h" - -namespace doris { - -static pthread_key_t s_resource_key; -static bool s_is_init = false; - -static void resource_destructor(void* value) { - TResourceInfo* info = (TResourceInfo*)value; - if (info != nullptr) { - delete info; - } -} - -void ResourceTls::init() { - int ret = pthread_key_create(&s_resource_key, resource_destructor); - if (ret != 0) { - LOG(ERROR) << "create pthread key for resource failed."; - return; - } - s_is_init = true; -} - -TResourceInfo* ResourceTls::get_resource_tls() { - if (!s_is_init) { - return nullptr; - } - return (TResourceInfo*)pthread_getspecific(s_resource_key); -} - -int ResourceTls::set_resource_tls(TResourceInfo* info) { - if (!s_is_init) { - return -1; - } - TResourceInfo* old_info = (TResourceInfo*)pthread_getspecific(s_resource_key); - - int ret = pthread_setspecific(s_resource_key, info); - if (ret == 0) { - // OK, now we delete old one - delete old_info; - } - return ret; -} - -} // namespace doris diff --git a/be/src/common/resource_tls.h b/be/src/common/resource_tls.h deleted file mode 100644 index deed16496d..0000000000 --- a/be/src/common/resource_tls.h +++ /dev/null @@ -1,30 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -namespace doris { - -class TResourceInfo; -class ResourceTls { -public: - static void init(); - static TResourceInfo* get_resource_tls(); - static int set_resource_tls(TResourceInfo*); -}; - -} // namespace doris diff --git a/be/src/http/action/file_cache_action.cpp b/be/src/http/action/file_cache_action.cpp index db432d5598..c55e639eea 100644 --- a/be/src/http/action/file_cache_action.cpp +++ b/be/src/http/action/file_cache_action.cpp @@ -42,9 +42,9 @@ Status FileCacheAction::_handle_header(HttpRequest* req, std::string* json_metri if (operation == "release") { size_t released = 0; if (req->param("base_path") != "") { - released = io::FileCacheFactory::instance().try_release(req->param("base_path")); + released = io::FileCacheFactory::instance()->try_release(req->param("base_path")); } else { - released = io::FileCacheFactory::instance().try_release(); + released = io::FileCacheFactory::instance()->try_release(); } EasyJson json; json["released_elements"] = released; diff --git a/be/src/io/cache/block/block_file_cache_factory.cpp b/be/src/io/cache/block/block_file_cache_factory.cpp index 8741ced5c0..6bbbba8023 100644 --- a/be/src/io/cache/block/block_file_cache_factory.cpp +++ b/be/src/io/cache/block/block_file_cache_factory.cpp @@ -31,15 +31,15 @@ #include "io/cache/block/block_file_cache_settings.h" #include "io/cache/block/block_lru_file_cache.h" #include "io/fs/local_file_system.h" +#include "runtime/exec_env.h" namespace doris { class TUniqueId; namespace io { -FileCacheFactory& FileCacheFactory::instance() { - static FileCacheFactory ret; - return ret; +FileCacheFactory* FileCacheFactory::instance() { + return ExecEnv::GetInstance()->file_cache_factory(); } size_t FileCacheFactory::try_release() { diff --git a/be/src/io/cache/block/block_file_cache_factory.h b/be/src/io/cache/block/block_file_cache_factory.h index c8ed2893f1..0b6b6504c9 100644 --- a/be/src/io/cache/block/block_file_cache_factory.h +++ b/be/src/io/cache/block/block_file_cache_factory.h @@ -37,7 +37,7 @@ namespace io { */ class FileCacheFactory { public: - static FileCacheFactory& instance(); + static FileCacheFactory* instance(); void create_file_cache(const std::string& cache_base_path, const FileCacheSettings& file_cache_settings, Status* status); diff --git a/be/src/io/cache/block/cached_remote_file_reader.cpp b/be/src/io/cache/block/cached_remote_file_reader.cpp index f1662418cf..86ecb14a75 100644 --- a/be/src/io/cache/block/cached_remote_file_reader.cpp +++ b/be/src/io/cache/block/cached_remote_file_reader.cpp @@ -47,21 +47,21 @@ CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr remote_file_reader _is_doris_table = opts.is_doris_table; if (_is_doris_table) { _cache_key = IFileCache::hash(path().filename().native()); - _cache = FileCacheFactory::instance().get_by_path(_cache_key); + _cache = FileCacheFactory::instance()->get_by_path(_cache_key); } else { // Use path and modification time to build cache key std::string unique_path = fmt::format("{}:{}", path().native(), opts.mtime); _cache_key = IFileCache::hash(unique_path); if (!opts.cache_base_path.empty()) { // from query session variable: file_cache_base_path - _cache = FileCacheFactory::instance().get_by_path(opts.cache_base_path); + _cache = FileCacheFactory::instance()->get_by_path(opts.cache_base_path); if (_cache == nullptr) { LOG(WARNING) << "Can't get cache from base path: " << opts.cache_base_path << ", using random instead."; - _cache = FileCacheFactory::instance().get_by_path(_cache_key); + _cache = FileCacheFactory::instance()->get_by_path(_cache_key); } } - _cache = FileCacheFactory::instance().get_by_path(path().native()); + _cache = FileCacheFactory::instance()->get_by_path(path().native()); } } diff --git a/be/src/io/fs/s3_file_write_bufferpool.h b/be/src/io/fs/s3_file_write_bufferpool.h index ad5f698f98..7e8bf01e19 100644 --- a/be/src/io/fs/s3_file_write_bufferpool.h +++ b/be/src/io/fs/s3_file_write_bufferpool.h @@ -28,6 +28,7 @@ #include "common/config.h" #include "common/status.h" #include "io/fs/s3_common.h" +#include "runtime/exec_env.h" #include "util/slice.h" namespace doris { @@ -126,8 +127,7 @@ public: doris::ThreadPool* thread_pool); static S3FileBufferPool* GetInstance() { - static S3FileBufferPool _pool; - return &_pool; + return ExecEnv::GetInstance()->get_s3_file_buffer_pool(); } void reclaim(Slice buf) { diff --git a/be/src/olap/olap_define.h b/be/src/olap/olap_define.h index e0e1d919a5..901e0403f0 100644 --- a/be/src/olap/olap_define.h +++ b/be/src/olap/olap_define.h @@ -179,6 +179,20 @@ const std::string REMOTE_TABLET_GC_PREFIX = "tgc_"; } \ } while (0) +#define SAFE_STOP(ptr) \ + do { \ + if (nullptr != ptr) { \ + ptr->stop(); \ + } \ + } while (0) + +#define SAFE_SHUTDOWN(ptr) \ + do { \ + if (nullptr != ptr) { \ + ptr->shutdown(); \ + } \ + } while (0) + #ifndef BUILD_VERSION #define BUILD_VERSION "Unknown" #endif diff --git a/be/src/olap/page_cache.cpp b/be/src/olap/page_cache.cpp index 57049bdc6d..1779c293f8 100644 --- a/be/src/olap/page_cache.cpp +++ b/be/src/olap/page_cache.cpp @@ -21,16 +21,16 @@ #include <ostream> -namespace doris { - -StoragePageCache* StoragePageCache::_s_instance = nullptr; +#include "runtime/exec_env.h" -void StoragePageCache::create_global_cache(size_t capacity, int32_t index_cache_percentage, - int64_t pk_index_cache_capacity, uint32_t num_shards) { - DCHECK(_s_instance == nullptr); - static StoragePageCache instance(capacity, index_cache_percentage, pk_index_cache_capacity, - num_shards); - _s_instance = &instance; +namespace doris { +StoragePageCache* StoragePageCache::create_global_cache(size_t capacity, + int32_t index_cache_percentage, + int64_t pk_index_cache_capacity, + uint32_t num_shards) { + StoragePageCache* res = new StoragePageCache(capacity, index_cache_percentage, + pk_index_cache_capacity, num_shards); + return res; } StoragePageCache::StoragePageCache(size_t capacity, int32_t index_cache_percentage, diff --git a/be/src/olap/page_cache.h b/be/src/olap/page_cache.h index e1e48f2856..c5c4e99099 100644 --- a/be/src/olap/page_cache.h +++ b/be/src/olap/page_cache.h @@ -127,13 +127,13 @@ public: static constexpr uint32_t kDefaultNumShards = 16; // Create global instance of this class - static void create_global_cache(size_t capacity, int32_t index_cache_percentage, - int64_t pk_index_cache_capacity, - uint32_t num_shards = kDefaultNumShards); + static StoragePageCache* create_global_cache(size_t capacity, int32_t index_cache_percentage, + int64_t pk_index_cache_capacity, + uint32_t num_shards = kDefaultNumShards); // Return global instance. // Client should call create_global_cache before. - static StoragePageCache* instance() { return _s_instance; } + static StoragePageCache* instance() { return ExecEnv::GetInstance()->get_storage_page_cache(); } StoragePageCache(size_t capacity, int32_t index_cache_percentage, int64_t pk_index_cache_capacity, uint32_t num_shards); @@ -165,7 +165,6 @@ public: private: StoragePageCache(); - static StoragePageCache* _s_instance; int32_t _index_cache_percentage = 0; std::unique_ptr<DataPageCache> _data_page_cache = nullptr; diff --git a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp index 055365cf31..f399f752c3 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp @@ -31,6 +31,7 @@ #include "olap/olap_common.h" #include "olap/rowset/segment_v2/inverted_index_compound_directory.h" #include "olap/rowset/segment_v2/inverted_index_compound_reader.h" +#include "runtime/exec_env.h" #include "runtime/thread_context.h" #include "util/defer_op.h" #include "util/runtime_profile.h" @@ -38,8 +39,6 @@ namespace doris { namespace segment_v2 { -InvertedIndexSearcherCache* InvertedIndexSearcherCache::_s_instance = nullptr; - IndexSearcherPtr InvertedIndexSearcherCache::build_index_searcher(const io::FileSystemSPtr& fs, const std::string& index_dir, const std::string& file_name) { @@ -55,10 +54,10 @@ IndexSearcherPtr InvertedIndexSearcherCache::build_index_searcher(const io::File return index_searcher; } -void InvertedIndexSearcherCache::create_global_instance(size_t capacity, uint32_t num_shards) { - DCHECK(_s_instance == nullptr); - static InvertedIndexSearcherCache instance(capacity, num_shards); - _s_instance = &instance; +InvertedIndexSearcherCache* InvertedIndexSearcherCache::create_global_instance( + size_t capacity, uint32_t num_shards) { + InvertedIndexSearcherCache* res = new InvertedIndexSearcherCache(capacity, num_shards); + return res; } InvertedIndexSearcherCache::InvertedIndexSearcherCache(size_t capacity, uint32_t num_shards) @@ -211,8 +210,6 @@ Cache::Handle* InvertedIndexSearcherCache::_insert(const InvertedIndexSearcherCa return lru_handle; } -InvertedIndexQueryCache* InvertedIndexQueryCache::_s_instance = nullptr; - bool InvertedIndexQueryCache::lookup(const CacheKey& key, InvertedIndexQueryCacheHandle* handle) { if (key.encode().empty()) { return false; diff --git a/be/src/olap/rowset/segment_v2/inverted_index_cache.h b/be/src/olap/rowset/segment_v2/inverted_index_cache.h index c67e17ddda..19a9eb7734 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_cache.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_cache.h @@ -37,6 +37,7 @@ #include "io/fs/path.h" #include "olap/lru_cache.h" #include "olap/rowset/segment_v2/inverted_index_query_type.h" +#include "runtime/exec_env.h" #include "runtime/memory/lru_cache_policy.h" #include "runtime/memory/mem_tracker.h" #include "util/slice.h" @@ -72,7 +73,8 @@ public: // Create global instance of this class. // "capacity" is the capacity of lru cache. - static void create_global_instance(size_t capacity, uint32_t num_shards = 16); + static InvertedIndexSearcherCache* create_global_instance(size_t capacity, + uint32_t num_shards = 16); void reset() { _cache.reset(); @@ -80,15 +82,11 @@ public: // Reset or clear the state of the object. } - static void reset_global_instance() { - if (_s_instance != nullptr) { - _s_instance->reset(); - } - } - // Return global instance. // Client should call create_global_cache before. - static InvertedIndexSearcherCache* instance() { return _s_instance; } + static InvertedIndexSearcherCache* instance() { + return ExecEnv::GetInstance()->get_inverted_index_searcher_cache(); + } static IndexSearcherPtr build_index_searcher(const io::FileSystemSPtr& fs, const std::string& index_dir, @@ -123,7 +121,6 @@ private: Cache::Handle* _insert(const InvertedIndexSearcherCache::CacheKey& key, CacheValue* value); private: - static InvertedIndexSearcherCache* _s_instance; std::unique_ptr<MemTracker> _mem_tracker = nullptr; }; @@ -223,15 +220,16 @@ public: }; // Create global instance of this class - static void create_global_cache(size_t capacity, uint32_t num_shards = 16) { - DCHECK(_s_instance == nullptr); - static InvertedIndexQueryCache instance(capacity, num_shards); - _s_instance = &instance; + static InvertedIndexQueryCache* create_global_cache(size_t capacity, uint32_t num_shards = 16) { + InvertedIndexQueryCache* res = new InvertedIndexQueryCache(capacity, num_shards); + return res; } // Return global instance. // Client should call create_global_cache before. - static InvertedIndexQueryCache* instance() { return _s_instance; } + static InvertedIndexQueryCache* instance() { + return ExecEnv::GetInstance()->get_inverted_index_query_cache(); + } InvertedIndexQueryCache() = delete; @@ -246,9 +244,6 @@ public: InvertedIndexQueryCacheHandle* handle); int64_t mem_consumption(); - -private: - static InvertedIndexQueryCache* _s_instance; }; class InvertedIndexQueryCacheHandle { diff --git a/be/src/olap/schema_cache.cpp b/be/src/olap/schema_cache.cpp index 39d1a60a4c..7bf6b592c6 100644 --- a/be/src/olap/schema_cache.cpp +++ b/be/src/olap/schema_cache.cpp @@ -35,7 +35,9 @@ namespace doris { -SchemaCache* SchemaCache::_s_instance = nullptr; +SchemaCache* SchemaCache::instance() { + return ExecEnv::GetInstance()->schema_cache(); +} // format: tabletId-unique_id1-uniqueid2...-version-type std::string SchemaCache::get_schema_key(int32_t tablet_id, const TabletSchemaSPtr& schema, @@ -69,10 +71,4 @@ std::string SchemaCache::get_schema_key(int32_t tablet_id, const std::vector<TCo return key; } -void SchemaCache::create_global_instance(size_t capacity) { - DCHECK(_s_instance == nullptr); - static SchemaCache instance(capacity); - _s_instance = &instance; -} - } // namespace doris \ No newline at end of file diff --git a/be/src/olap/schema_cache.h b/be/src/olap/schema_cache.h index 5d94c92837..dbce5336d8 100644 --- a/be/src/olap/schema_cache.h +++ b/be/src/olap/schema_cache.h @@ -48,7 +48,7 @@ class SchemaCache : public LRUCachePolicy { public: enum class Type { TABLET_SCHEMA = 0, SCHEMA = 1 }; - static SchemaCache* instance() { return _s_instance; } + static SchemaCache* instance(); static void create_global_instance(size_t capacity); @@ -62,7 +62,7 @@ public: // Get a shared cached schema from cache, schema_key is a subset of column unique ids template <typename SchemaType> SchemaType get_schema(const std::string& schema_key) { - if (!_s_instance || schema_key.empty()) { + if (!instance() || schema_key.empty()) { return {}; } auto lru_handle = _cache->lookup(schema_key); @@ -84,7 +84,7 @@ public: // Insert a shared Schema into cache, schema_key is full column unique ids template <typename SchemaType> void insert_schema(const std::string& key, SchemaType schema) { - if (!_s_instance || key.empty()) { + if (!instance() || key.empty()) { return; } CacheValue* value = new CacheValue; @@ -115,12 +115,12 @@ public: SchemaSPtr schema = nullptr; }; -private: SchemaCache(size_t capacity) : LRUCachePolicy(CachePolicy::CacheType::SCHEMA_CACHE, capacity, LRUCacheType::NUMBER, config::schema_cache_sweep_time_sec) {} + +private: static constexpr char SCHEMA_DELIMITER = '-'; - static SchemaCache* _s_instance; }; } // namespace doris \ No newline at end of file diff --git a/be/src/olap/segment_loader.cpp b/be/src/olap/segment_loader.cpp index 617b5e6fff..8d759cce8e 100644 --- a/be/src/olap/segment_loader.cpp +++ b/be/src/olap/segment_loader.cpp @@ -24,12 +24,8 @@ namespace doris { -SegmentLoader* SegmentLoader::_s_instance = nullptr; - -void SegmentLoader::create_global_instance(size_t capacity) { - DCHECK(_s_instance == nullptr); - static SegmentLoader instance(capacity); - _s_instance = &instance; +SegmentLoader* SegmentLoader::instance() { + return ExecEnv::GetInstance()->segment_loader(); } bool SegmentCache::lookup(const SegmentCache::CacheKey& key, SegmentCacheHandle* handle) { diff --git a/be/src/olap/segment_loader.h b/be/src/olap/segment_loader.h index d5849e6097..d6b1d07940 100644 --- a/be/src/olap/segment_loader.h +++ b/be/src/olap/segment_loader.h @@ -95,6 +95,8 @@ public: class SegmentLoader { public: + static SegmentLoader* instance(); + // Create global instance of this class. // "capacity" is the capacity of lru cache. // TODO: Currently we use the number of rowset as the cache capacity. @@ -102,11 +104,6 @@ public: // This is because currently we cannot accurately estimate the memory occupied by a segment. // After the estimation of segment memory usage is provided later, it is recommended // to use Memory as the capacity limit of the cache. - static void create_global_instance(size_t capacity); - - // Return global instance. - // Client should call create_global_cache before. - static SegmentLoader* instance() { return _s_instance; } SegmentLoader(size_t capacity) { _segment_cache = std::make_unique<SegmentCache>(capacity); } @@ -121,8 +118,6 @@ public: private: SegmentLoader(); - - static SegmentLoader* _s_instance; std::unique_ptr<SegmentCache> _segment_cache = nullptr; }; diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index b425076fc0..8bace65617 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -93,8 +93,6 @@ using namespace ErrorCode; DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(unused_rowsets_count, MetricUnit::ROWSETS); -StorageEngine* StorageEngine::_s_instance = nullptr; - static Status _validate_options(const EngineOptions& options) { if (options.store_paths.empty()) { return Status::InternalError("store paths is empty"); @@ -102,14 +100,11 @@ static Status _validate_options(const EngineOptions& options) { return Status::OK(); } -Status StorageEngine::open(const EngineOptions& options, - std::unique_ptr<StorageEngine>* engine_ptr) { - RETURN_IF_ERROR(_validate_options(options)); - LOG(INFO) << "starting backend using uid:" << options.backend_uid.to_string(); - std::unique_ptr<StorageEngine> engine(new StorageEngine(options)); - RETURN_NOT_OK_STATUS_WITH_WARN(engine->_open(), "open engine failed"); +Status StorageEngine::open() { + RETURN_IF_ERROR(_validate_options(_options)); + LOG(INFO) << "starting backend using uid:" << _options.backend_uid.to_string(); + RETURN_NOT_OK_STATUS_WITH_WARN(_open(), "open engine failed"); LOG(INFO) << "success to init storage engine."; - *engine_ptr = std::move(engine); return Status::OK(); } @@ -130,7 +125,6 @@ StorageEngine::StorageEngine(const EngineOptions& options) _default_rowset_type(BETA_ROWSET), _heartbeat_flags(nullptr), _stream_load_recorder(nullptr) { - _s_instance = this; REGISTER_HOOK_METRIC(unused_rowsets_count, [this]() { // std::lock_guard<std::mutex> lock(_gc_mutex); return _unused_rowsets.size(); @@ -139,30 +133,7 @@ StorageEngine::StorageEngine(const EngineOptions& options) StorageEngine::~StorageEngine() { stop(); - - DEREGISTER_HOOK_METRIC(unused_rowsets_count); - - if (_base_compaction_thread_pool) { - _base_compaction_thread_pool->shutdown(); - } - if (_cumu_compaction_thread_pool) { - _cumu_compaction_thread_pool->shutdown(); - } - if (_single_replica_compaction_thread_pool) { - _single_replica_compaction_thread_pool->shutdown(); - } - - if (_seg_compaction_thread_pool) { - _seg_compaction_thread_pool->shutdown(); - } - if (_tablet_meta_checkpoint_thread_pool) { - _tablet_meta_checkpoint_thread_pool->shutdown(); - } - if (_cold_data_compaction_thread_pool) { - _cold_data_compaction_thread_pool->shutdown(); - } _clear(); - _s_instance = nullptr; } Status StorageEngine::load_data_dirs(const std::vector<DataDir*>& data_dirs) { @@ -543,7 +514,10 @@ void StorageEngine::_exit_if_too_many_disks_are_failed() { } void StorageEngine::stop() { - if (_stopped) return; + if (_stopped) { + LOG(WARNING) << "Storage engine is stopped twice."; + return; + } // trigger the waiting threads notify_listeners(); @@ -582,7 +556,32 @@ void StorageEngine::stop() { THREADS_JOIN(_path_gc_threads); THREADS_JOIN(_path_scan_threads); #undef THREADS_JOIN + + if (_base_compaction_thread_pool) { + _base_compaction_thread_pool->shutdown(); + } + if (_cumu_compaction_thread_pool) { + _cumu_compaction_thread_pool->shutdown(); + } + if (_single_replica_compaction_thread_pool) { + _single_replica_compaction_thread_pool->shutdown(); + } + + if (_seg_compaction_thread_pool) { + _seg_compaction_thread_pool->shutdown(); + } + if (_tablet_meta_checkpoint_thread_pool) { + _tablet_meta_checkpoint_thread_pool->shutdown(); + } + if (_cold_data_compaction_thread_pool) { + _cold_data_compaction_thread_pool->shutdown(); + } + + _memtable_flush_executor.reset(nullptr); + _calc_delete_bitmap_executor.reset(nullptr); + _stopped = true; + LOG(INFO) << "Storage engine is stopped."; } void StorageEngine::_clear() { diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 02e6ec5f7b..b5cb99ec21 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -47,6 +47,7 @@ #include "olap/rowset/segment_v2/segment.h" #include "olap/tablet.h" #include "olap/task/index_builder.h" +#include "runtime/exec_env.h" #include "runtime/heartbeat_flags.h" #include "util/countdown_latch.h" @@ -81,9 +82,9 @@ public: StorageEngine(const EngineOptions& options); ~StorageEngine(); - static Status open(const EngineOptions& options, std::unique_ptr<StorageEngine>* engine_ptr); + [[nodiscard]] Status open(); - static StorageEngine* instance() { return _s_instance; } + static StorageEngine* instance() { return ExecEnv::GetInstance()->get_storage_engine(); } Status create_tablet(const TCreateTabletReq& request, RuntimeProfile* profile); @@ -178,6 +179,7 @@ public: // option: update disk usage after sweep Status start_trash_sweep(double* usage, bool ignore_guard = false); + // Must call stop() before storage_engine is deconstructed void stop(); void get_tablet_rowset_versions(const PGetTabletVersionsRequest* request, @@ -373,8 +375,7 @@ private: int32_t _effective_cluster_id; bool _is_all_cluster_id_exist; - static StorageEngine* _s_instance; - bool _stopped; + std::atomic_bool _stopped {false}; std::mutex _gc_mutex; // map<rowset_id(str), RowsetSharedPtr>, if we use RowsetId as the key, we need custom hash func diff --git a/be/src/olap/tablet_schema_cache.cpp b/be/src/olap/tablet_schema_cache.cpp index 2e4b221492..90880665ad 100644 --- a/be/src/olap/tablet_schema_cache.cpp +++ b/be/src/olap/tablet_schema_cache.cpp @@ -19,12 +19,7 @@ namespace doris { -TabletSchemaCache::~TabletSchemaCache() { - stop_and_join(); -} - TabletSchemaSPtr TabletSchemaCache::insert(const std::string& key) { - DCHECK(_s_instance != nullptr); std::lock_guard guard(_mtx); auto iter = _cache.find(key); if (iter == _cache.end()) { @@ -41,11 +36,18 @@ TabletSchemaSPtr TabletSchemaCache::insert(const std::string& key) { return iter->second; } +void TabletSchemaCache::start() { + std::thread t(&TabletSchemaCache::_recycle, this); + t.detach(); + LOG(INFO) << "TabletSchemaCache started"; +} + void TabletSchemaCache::stop() { _should_stop = true; while (!_is_stopped) { std::this_thread::sleep_for(std::chrono::seconds(1)); } + LOG(INFO) << "TabletSchemaCache stopped"; } /** diff --git a/be/src/olap/tablet_schema_cache.h b/be/src/olap/tablet_schema_cache.h index 6c692aaf46..93798983c8 100644 --- a/be/src/olap/tablet_schema_cache.h +++ b/be/src/olap/tablet_schema_cache.h @@ -24,31 +24,28 @@ #include <unordered_map> #include "olap/tablet_schema.h" +#include "runtime/exec_env.h" #include "util/doris_metrics.h" namespace doris { class TabletSchemaCache { public: - ~TabletSchemaCache(); + ~TabletSchemaCache() = default; - static void create_global_schema_cache() { - DCHECK(_s_instance == nullptr); - static TabletSchemaCache instance; - _s_instance = &instance; - std::thread t(&TabletSchemaCache::_recycle, _s_instance); - t.detach(); + static TabletSchemaCache* create_global_schema_cache() { + TabletSchemaCache* res = new TabletSchemaCache(); + return res; } - static TabletSchemaCache* instance() { return _s_instance; } - - static void stop_and_join() { - DCHECK(_s_instance != nullptr); - _s_instance->stop(); + static TabletSchemaCache* instance() { + return ExecEnv::GetInstance()->get_tablet_schema_cache(); } TabletSchemaSPtr insert(const std::string& key); + void start(); + void stop(); private: @@ -58,7 +55,6 @@ private: void _recycle(); private: - static inline TabletSchemaCache* _s_instance = nullptr; std::mutex _mtx; std::unordered_map<std::string, TabletSchemaSPtr> _cache; std::atomic_bool _should_stop = {false}; diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 0ce4115c0c..c4278c3807 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -189,7 +189,7 @@ void BlockedTaskScheduler::_make_task_run(std::list<PipelineTask*>& local_tasks, } TaskScheduler::~TaskScheduler() { - shutdown(); + stop(); } Status TaskScheduler::start() { @@ -340,7 +340,7 @@ void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state) task->fragment_context()->close_a_pipeline(); } -void TaskScheduler::shutdown() { +void TaskScheduler::stop() { if (!this->_shutdown.load()) { this->_shutdown.store(true); _blocked_task_scheduler->shutdown(); diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h index ac9389c088..13b9e734d6 100644 --- a/be/src/pipeline/task_scheduler.h +++ b/be/src/pipeline/task_scheduler.h @@ -89,7 +89,7 @@ public: Status start(); - void shutdown(); + void stop(); TaskQueue* task_queue() const { return _task_queue.get(); } diff --git a/be/src/runtime/broker_mgr.cpp b/be/src/runtime/broker_mgr.cpp index 6d17f32ffb..613b916d06 100644 --- a/be/src/runtime/broker_mgr.cpp +++ b/be/src/runtime/broker_mgr.cpp @@ -53,7 +53,7 @@ BrokerMgr::BrokerMgr(ExecEnv* exec_env) : _exec_env(exec_env), _stop_background_ }); } -BrokerMgr::~BrokerMgr() { +void BrokerMgr::stop() { DEREGISTER_HOOK_METRIC(broker_count); _stop_background_threads_latch.count_down(); if (_ping_thread) { diff --git a/be/src/runtime/broker_mgr.h b/be/src/runtime/broker_mgr.h index fd7a67e43c..d9238aed2c 100644 --- a/be/src/runtime/broker_mgr.h +++ b/be/src/runtime/broker_mgr.h @@ -35,8 +35,9 @@ class Thread; class BrokerMgr { public: BrokerMgr(ExecEnv* exec_env); - ~BrokerMgr(); + ~BrokerMgr() = default; void init(); + void stop(); const std::string& get_client_id(const TNetworkAddress& address); private: diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp index c30fb20db8..27986f5de3 100644 --- a/be/src/runtime/exec_env.cpp +++ b/be/src/runtime/exec_env.cpp @@ -23,6 +23,7 @@ #include <utility> #include "common/config.h" +#include "olap/olap_define.h" #include "runtime/fragment_mgr.h" #include "runtime/frontend_info.h" #include "time.h" @@ -31,10 +32,8 @@ namespace doris { -ExecEnv::ExecEnv() = default; - ExecEnv::~ExecEnv() { - _destroy(); + destroy(); } const std::string& ExecEnv::token() const { diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index f6c61d26f3..fd54d44166 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -17,7 +17,7 @@ #pragma once -#include <gen_cpp/HeartbeatService_types.h> +#include <common/multi_version.h> #include <stddef.h> #include <algorithm> @@ -32,12 +32,13 @@ #include "common/status.h" #include "olap/memtable_memory_limiter.h" +#include "olap/olap_define.h" #include "olap/options.h" +#include "runtime/frontend_info.h" // TODO(zhiqiang): find a way to remove this include header #include "util/threadpool.h" #include "vec/common/hash_table/phmap_fwd_decl.h" namespace doris { -struct FrontendInfo; namespace vectorized { class VDataStreamMgr; class ScannerScheduler; @@ -49,6 +50,15 @@ class TaskScheduler; namespace taskgroup { class TaskGroupManager; } +namespace io { +class S3FileBufferPool; +class FileCacheFactory; +} // namespace io +namespace segment_v2 { +class InvertedIndexSearcherCache; +class InvertedIndexQueryCache; +} // namespace segment_v2 + class BfdParser; class BrokerMgr; template <class T> @@ -80,6 +90,14 @@ class HeartbeatFlags; class FrontendServiceClient; class FileMetaCache; class GroupCommitMgr; +class TabletSchemaCache; +class UserFunctionCache; +class SchemaCache; +class StoragePageCache; +class SegmentLoader; +class LookupConnectionCache; +class RowCache; +class CacheManager; inline bool k_doris_exit = false; @@ -89,9 +107,15 @@ inline bool k_doris_exit = false; // once to properly initialise service state. class ExecEnv { public: + // Empty destructor because the compiler-generated one requires full + // declarations for classes in scoped_ptrs. + ~ExecEnv(); + // Initial exec environment. must call this to init all - static Status init(ExecEnv* env, const std::vector<StorePath>& store_paths); - static void destroy(ExecEnv* exec_env); + [[nodiscard]] static Status init(ExecEnv* env, const std::vector<StorePath>& store_paths); + + // Stop all threads and delete resources. + void destroy(); /// Returns the first created exec env instance. In a normal doris, this is /// the only instance. In test setups with multiple ExecEnv's per process, @@ -101,10 +125,6 @@ public: return &s_exec_env; } - // Empty destructor because the compiler-generated one requires full - // declarations for classes in scoped_ptrs. - ~ExecEnv(); - static bool ready() { return _s_ready.load(std::memory_order_acquire); } const std::string& token() const; ExternalScanContextMgr* external_scan_context_mgr() { return _external_scan_context_mgr; } @@ -152,12 +172,15 @@ public: void init_download_cache_buf(); void init_download_cache_required_components(); Status init_pipeline_task_scheduler(); + void init_file_cache_factory(); char* get_download_cache_buf(ThreadPoolToken* token) { if (_download_cache_buf_map.find(token) == _download_cache_buf_map.end()) { return nullptr; } return _download_cache_buf_map[token].get(); } + io::FileCacheFactory* file_cache_factory() { return _file_cache_factory; } + UserFunctionCache* user_function_cache() { return _user_function_cache; } FragmentMgr* fragment_mgr() { return _fragment_mgr; } ResultCache* result_cache() { return _result_cache; } TMasterInfo* master_info() { return _master_info; } @@ -185,14 +208,11 @@ public: FileMetaCache* file_meta_cache() { return _file_meta_cache; } MemTableMemoryLimiter* memtable_memory_limiter() { return _memtable_memory_limiter.get(); } #ifdef BE_TEST + void set_ready() { this->_s_ready = true; } + void set_not_ready() { this->_s_ready = false; } void set_memtable_memory_limiter(MemTableMemoryLimiter* limiter) { _memtable_memory_limiter.reset(limiter); } -#endif - vectorized::ZoneList& global_zone_cache() { return *_global_zone_cache; } - std::shared_mutex& zone_cache_rw_lock() { return _zone_cache_rw_lock; } - - // only for unit test void set_master_info(TMasterInfo* master_info) { this->_master_info = master_info; } void set_new_load_stream_mgr(std::shared_ptr<NewLoadStreamMgr> new_load_stream_mgr) { this->_new_load_stream_mgr = new_load_stream_mgr; @@ -201,16 +221,45 @@ public: this->_stream_load_executor = stream_load_executor; } + void set_storage_engine(StorageEngine* se) { this->_storage_engine = se; } + void set_cache_manager(CacheManager* cm) { this->_cache_manager = cm; } + void set_tablet_schema_cache(TabletSchemaCache* c) { this->_tablet_schema_cache = c; } + void set_storage_page_cache(StoragePageCache* c) { this->_storage_page_cache = c; } + void set_segment_loader(SegmentLoader* sl) { this->_segment_loader = sl; } + void set_routine_load_task_executor(RoutineLoadTaskExecutor* r) { + this->_routine_load_task_executor = r; + } + +#endif + vectorized::ZoneList& global_zone_cache() { return *_global_zone_cache; } + std::shared_mutex& zone_cache_rw_lock() { return _zone_cache_rw_lock; } + void wait_for_all_tasks_done(); void update_frontends(const std::vector<TFrontendInfo>& new_infos); std::map<TNetworkAddress, FrontendInfo> get_frontends(); std::map<TNetworkAddress, FrontendInfo> get_running_frontends(); + TabletSchemaCache* get_tablet_schema_cache() { return _tablet_schema_cache; } + StorageEngine* get_storage_engine() { return _storage_engine; } + io::S3FileBufferPool* get_s3_file_buffer_pool() { return _s3_buffer_pool; } + SchemaCache* schema_cache() { return _schema_cache; } + StoragePageCache* get_storage_page_cache() { return _storage_page_cache; } + SegmentLoader* segment_loader() { return _segment_loader; } + LookupConnectionCache* get_lookup_connection_cache() { return _lookup_connection_cache; } + RowCache* get_row_cache() { return _row_cache; } + CacheManager* get_cache_manager() { return _cache_manager; } + segment_v2::InvertedIndexSearcherCache* get_inverted_index_searcher_cache() { + return _inverted_index_searcher_cache; + } + segment_v2::InvertedIndexQueryCache* get_inverted_index_query_cache() { + return _inverted_index_query_cache; + } + private: - ExecEnv(); + ExecEnv() = default; - Status _init(const std::vector<StorePath>& store_paths); + [[nodiscard]] Status _init(const std::vector<StorePath>& store_paths); void _destroy(); Status _init_mem_env(); @@ -221,6 +270,8 @@ private: inline static std::atomic_bool _s_ready {false}; std::vector<StorePath> _store_paths; + io::FileCacheFactory* _file_cache_factory = nullptr; + UserFunctionCache* _user_function_cache = nullptr; // Leave protected so that subclasses can override ExternalScanContextMgr* _external_scan_context_mgr = nullptr; doris::vectorized::VDataStreamMgr* _vstream_mgr = nullptr; @@ -268,6 +319,7 @@ private: BfdParser* _bfd_parser = nullptr; BrokerMgr* _broker_mgr = nullptr; LoadChannelMgr* _load_channel_mgr = nullptr; + // TODO(zhiqiang): Do not use shared_ptr in exec_env, we can not control its life cycle. std::shared_ptr<NewLoadStreamMgr> _new_load_stream_mgr; BrpcClientCache<PBackendService_Stub>* _internal_client_cache = nullptr; BrpcClientCache<PFunctionService_Stub>* _function_client_cache = nullptr; @@ -289,6 +341,22 @@ private: std::mutex _frontends_lock; std::map<TNetworkAddress, FrontendInfo> _frontends; GroupCommitMgr* _group_commit_mgr = nullptr; + + // Maybe we should use unique_ptr, but it need complete type, which means we need + // to include many headers, and for some cpp file that do not need class like TabletSchemaCache, + // these redundancy header could introduce potential bug, at least, more header means slow compile. + // So we choose to use raw pointer, please remember to delete these pointer in deconstructor. + TabletSchemaCache* _tablet_schema_cache = nullptr; + io::S3FileBufferPool* _s3_buffer_pool = nullptr; + StorageEngine* _storage_engine = nullptr; + SchemaCache* _schema_cache = nullptr; + StoragePageCache* _storage_page_cache = nullptr; + SegmentLoader* _segment_loader = nullptr; + LookupConnectionCache* _lookup_connection_cache = nullptr; + RowCache* _row_cache = nullptr; + CacheManager* _cache_manager = nullptr; + segment_v2::InvertedIndexSearcherCache* _inverted_index_searcher_cache = nullptr; + segment_v2::InvertedIndexQueryCache* _inverted_index_query_cache = nullptr; }; template <> @@ -305,4 +373,8 @@ ExecEnv::get_client_cache<TPaloBrokerServiceClient>() { return _broker_client_cache; } +inline segment_v2::InvertedIndexQueryCache* GetInvertedIndexQueryCache() { + return ExecEnv::GetInstance()->get_inverted_index_query_cache(); +} + } // namespace doris diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 0bc375aac4..bcdbf49801 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -16,6 +16,7 @@ // under the License. // IWYU pragma: no_include <bthread/errno.h> +#include <common/multi_version.h> #include <errno.h> // IWYU pragma: keep #include <gen_cpp/HeartbeatService_types.h> #include <gen_cpp/Metrics_types.h> @@ -36,7 +37,9 @@ #include "common/config.h" #include "common/logging.h" #include "common/status.h" +#include "io/cache/block/block_file_cache_factory.h" #include "io/fs/file_meta_cache.h" +#include "io/fs/s3_file_write_bufferpool.h" #include "olap/memtable_memory_limiter.h" #include "olap/olap_define.h" #include "olap/options.h" @@ -44,6 +47,7 @@ #include "olap/rowset/segment_v2/inverted_index_cache.h" #include "olap/schema_cache.h" #include "olap/segment_loader.h" +#include "olap/storage_engine.h" #include "pipeline/task_queue.h" #include "pipeline/task_scheduler.h" #include "runtime/block_spill_manager.h" @@ -69,7 +73,9 @@ #include "runtime/stream_load/stream_load_executor.h" #include "runtime/task_group/task_group_manager.h" #include "runtime/thread_context.h" +#include "runtime/user_function_cache.h" #include "service/backend_options.h" +#include "service/backend_service.h" #include "service/point_query_executor.h" #include "util/bfd_parser.h" #include "util/bit_util.h" @@ -82,6 +88,7 @@ #include "util/parse_util.h" #include "util/pretty_printer.h" #include "util/threadpool.h" +#include "util/thrift_rpc_helper.h" #include "util/timezone_utils.h" #include "vec/exec/scan/scanner_scheduler.h" #include "vec/runtime/vdata_stream_mgr.h" @@ -135,7 +142,8 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) { } init_doris_metrics(store_paths); _store_paths = store_paths; - + _user_function_cache = new UserFunctionCache(); + _user_function_cache->init(doris::config::user_function_dir); _external_scan_context_mgr = new ExternalScanContextMgr(this); _vstream_mgr = new doris::vectorized::VDataStreamMgr(); _result_mgr = new ResultBufferMgr(); @@ -146,6 +154,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) { TimezoneUtils::load_timezone_names(); + // _global_zone_cache is not owned by ExecEnv ... maybe should refactor. _global_zone_cache = std::make_unique<vectorized::ZoneList>(); TimezoneUtils::load_timezones_to_cache(*_global_zone_cache); @@ -176,7 +185,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) { .set_max_threads(std::numeric_limits<int>::max()) .set_max_queue_size(config::fragment_pool_queue_size) .build(&_join_node_thread_pool); - + init_file_cache_factory(); RETURN_IF_ERROR(init_pipeline_task_scheduler()); _task_group_manager = new taskgroup::TaskGroupManager(); _scanner_scheduler = new doris::vectorized::ScannerScheduler(); @@ -205,12 +214,16 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) { _result_mgr->init(); Status status = _load_path_mgr->init(); if (!status.ok()) { - LOG(ERROR) << "load path mgr init failed." << status; - exit(-1); + LOG(ERROR) << "Load path mgr init failed. " << status; + return status; } _broker_mgr->init(); _small_file_mgr->init(); - _scanner_scheduler->init(this); + status = _scanner_scheduler->init(); + if (!status.ok()) { + LOG(ERROR) << "Scanner scheduler init failed. " << status; + return status; + } _init_mem_env(); @@ -218,7 +231,33 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) { RETURN_IF_ERROR(_load_channel_mgr->init(MemInfo::mem_limit())); _heartbeat_flags = new HeartbeatFlags(); _register_metrics(); + + _tablet_schema_cache = TabletSchemaCache::create_global_schema_cache(); + _tablet_schema_cache->start(); + + // S3 buffer pool + _s3_buffer_pool = new io::S3FileBufferPool(); + _s3_buffer_pool->init(config::s3_write_buffer_whole_size, config::s3_write_buffer_size, + this->buffered_reader_prefetch_thread_pool()); + + // Storage engine + doris::EngineOptions options; + options.store_paths = store_paths; + options.backend_uid = doris::UniqueId::gen_uid(); + _storage_engine = new StorageEngine(options); + auto st = _storage_engine->open(); + if (!st.ok()) { + LOG(ERROR) << "Lail to open StorageEngine, res=" << st; + return st; + } + _storage_engine->set_heartbeat_flags(this->heartbeat_flags()); + if (st = _storage_engine->start_bg_threads(); !st.ok()) { + LOG(ERROR) << "Failed to starge bg threads of storage engine, res=" << st; + return st; + } + _s_ready = true; + return Status::OK(); } @@ -244,6 +283,56 @@ Status ExecEnv::init_pipeline_task_scheduler() { return Status::OK(); } +void ExecEnv::init_file_cache_factory() { + // Load file cache before starting up daemon threads to make sure StorageEngine is read. + if (doris::config::enable_file_cache) { + _file_cache_factory = new io::FileCacheFactory(); + io::IFileCache::init(); + std::unordered_set<std::string> cache_path_set; + std::vector<doris::CachePath> cache_paths; + Status olap_res = + doris::parse_conf_cache_paths(doris::config::file_cache_path, cache_paths); + if (!olap_res) { + LOG(FATAL) << "parse config file cache path failed, path=" + << doris::config::file_cache_path; + exit(-1); + } + + std::unique_ptr<doris::ThreadPool> file_cache_init_pool; + doris::ThreadPoolBuilder("FileCacheInitThreadPool") + .set_min_threads(cache_paths.size()) + .set_max_threads(cache_paths.size()) + .build(&file_cache_init_pool); + + std::list<doris::Status> cache_status; + for (auto& cache_path : cache_paths) { + if (cache_path_set.find(cache_path.path) != cache_path_set.end()) { + LOG(WARNING) << fmt::format("cache path {} is duplicate", cache_path.path); + continue; + } + + olap_res = file_cache_init_pool->submit_func(std::bind( + &io::FileCacheFactory::create_file_cache, _file_cache_factory, cache_path.path, + cache_path.init_settings(), &(cache_status.emplace_back()))); + + if (!olap_res.ok()) { + LOG(FATAL) << "failed to init file cache, err: " << olap_res; + exit(-1); + } + cache_path_set.emplace(cache_path.path); + } + + file_cache_init_pool->wait(); + for (const auto& status : cache_status) { + if (!status.ok()) { + LOG(FATAL) << "failed to init file cache, err: " << status; + exit(-1); + } + } + } + return; +} + Status ExecEnv::_init_mem_env() { bool is_percent = false; std::stringstream ss; @@ -262,7 +351,7 @@ Status ExecEnv::_init_mem_env() { } // 3. init storage page cache - CacheManager::create_global_instance(); + _cache_manager = CacheManager::create_global_instance(); int64_t storage_cache_limit = ParseUtil::parse_mem_spec(config::storage_page_cache_limit, MemInfo::mem_limit(), @@ -286,8 +375,8 @@ Status ExecEnv::_init_mem_env() { while (!is_percent && pk_storage_page_cache_limit > MemInfo::mem_limit() / 2) { pk_storage_page_cache_limit = storage_cache_limit / 2; } - StoragePageCache::create_global_cache(storage_cache_limit, index_percentage, - pk_storage_page_cache_limit, num_shards); + _storage_page_cache = StoragePageCache::create_global_cache( + storage_cache_limit, index_percentage, pk_storage_page_cache_limit, num_shards); LOG(INFO) << "Storage page cache memory limit: " << PrettyPrinter::print(storage_cache_limit, TUnit::BYTES) << ", origin config value: " << config::storage_page_cache_limit; @@ -300,7 +389,7 @@ Status ExecEnv::_init_mem_env() { // Reason same as buffer_pool_limit row_cache_mem_limit = row_cache_mem_limit / 2; } - RowCache::create_global_cache(row_cache_mem_limit); + _row_cache = RowCache::create_global_cache(row_cache_mem_limit); LOG(INFO) << "Row cache memory limit: " << PrettyPrinter::print(row_cache_mem_limit, TUnit::BYTES) << ", origin config value: " << config::row_cache_mem_limit; @@ -322,11 +411,12 @@ Status ExecEnv::_init_mem_env() { } LOG(INFO) << "segment_cache_capacity <= fd_number * 2 / 5, fd_number: " << fd_number << " segment_cache_capacity: " << segment_cache_capacity; - SegmentLoader::create_global_instance(segment_cache_capacity); + _segment_loader = new SegmentLoader(segment_cache_capacity); - SchemaCache::create_global_instance(config::schema_cache_capacity); + _schema_cache = new SchemaCache(config::schema_cache_capacity); - LookupConnectionCache::create_global_instance(config::lookup_connection_cache_bytes_limit); + _lookup_connection_cache = LookupConnectionCache::create_global_instance( + config::lookup_connection_cache_bytes_limit); // use memory limit int64_t inverted_index_cache_limit = @@ -336,7 +426,8 @@ Status ExecEnv::_init_mem_env() { // Reason same as buffer_pool_limit inverted_index_cache_limit = inverted_index_cache_limit / 2; } - InvertedIndexSearcherCache::create_global_instance(inverted_index_cache_limit); + _inverted_index_searcher_cache = + InvertedIndexSearcherCache::create_global_instance(inverted_index_cache_limit); LOG(INFO) << "Inverted index searcher cache memory limit: " << PrettyPrinter::print(inverted_index_cache_limit, TUnit::BYTES) << ", origin config value: " << config::inverted_index_searcher_cache_limit; @@ -349,7 +440,8 @@ Status ExecEnv::_init_mem_env() { // Reason same as buffer_pool_limit inverted_index_query_cache_limit = inverted_index_query_cache_limit / 2; } - InvertedIndexQueryCache::create_global_cache(inverted_index_query_cache_limit); + _inverted_index_query_cache = + InvertedIndexQueryCache::create_global_cache(inverted_index_query_cache_limit); LOG(INFO) << "Inverted index query match cache memory limit: " << PrettyPrinter::print(inverted_index_cache_limit, TUnit::BYTES) << ", origin config value: " << config::inverted_index_query_cache_limit; @@ -410,58 +502,120 @@ void ExecEnv::_deregister_metrics() { DEREGISTER_HOOK_METRIC(download_cache_thread_pool_queue_size); } -void ExecEnv::_destroy() { +// TODO(zhiqiang): Need refactor all thread pool. Each thread pool must have a Stop method. +// We need to stop all threads before releasing resource. +void ExecEnv::destroy() { //Only destroy once after init if (!ready()) { return; } // Memory barrier to prevent other threads from accessing destructed resources _s_ready = false; + + SAFE_STOP(_tablet_schema_cache); + SAFE_STOP(_load_channel_mgr); + SAFE_STOP(_scanner_scheduler); + SAFE_STOP(_broker_mgr); + SAFE_STOP(_load_path_mgr); + SAFE_STOP(_result_mgr); + SAFE_STOP(_group_commit_mgr); + // _routine_load_task_executor should be stopped before _new_load_stream_mgr. + SAFE_STOP(_routine_load_task_executor); + SAFE_STOP(_pipeline_task_scheduler); + SAFE_STOP(_pipeline_task_group_scheduler); + SAFE_STOP(_external_scan_context_mgr); + SAFE_STOP(_fragment_mgr); + // NewLoadStreamMgr should be destoried before storage_engine & after fragment_mgr stopped. + _new_load_stream_mgr.reset(); + _stream_load_executor.reset(); + SAFE_STOP(_storage_engine); + SAFE_SHUTDOWN(_buffered_reader_prefetch_thread_pool); + SAFE_SHUTDOWN(_join_node_thread_pool); + SAFE_SHUTDOWN(_send_report_thread_pool); + SAFE_SHUTDOWN(_send_batch_thread_pool); + SAFE_SHUTDOWN(_serial_download_cache_thread_token); + SAFE_SHUTDOWN(_download_cache_thread_pool); + + // Free resource after threads are stopped. + // Some threads are still running, like threads created by _new_load_stream_mgr ... + SAFE_DELETE(_s3_buffer_pool); + SAFE_DELETE(_tablet_schema_cache); _deregister_metrics(); - SAFE_DELETE(_internal_client_cache); - SAFE_DELETE(_function_client_cache); SAFE_DELETE(_load_channel_mgr); + _memtable_memory_limiter.reset(nullptr); + + // shared_ptr maybe no need to be reset + // _brpc_iobuf_block_memory_tracker.reset(); + // _page_no_cache_mem_tracker.reset(); + // _experimental_mem_tracker.reset(); + // _orphan_mem_tracker.reset(); + + SAFE_DELETE(_block_spill_mgr); + SAFE_DELETE(_inverted_index_query_cache); + SAFE_DELETE(_inverted_index_searcher_cache); + SAFE_DELETE(_lookup_connection_cache); + SAFE_DELETE(_schema_cache); + SAFE_DELETE(_segment_loader); + SAFE_DELETE(_row_cache); + + // StorageEngine must be destoried before _page_no_cache_mem_tracker.reset + // StorageEngine must be destoried before _cache_manager destory + SAFE_DELETE(_storage_engine); + + // _scanner_scheduler must be desotried before _storage_page_cache + SAFE_DELETE(_scanner_scheduler); + // _storage_page_cache must be destoried before _cache_manager + SAFE_DELETE(_storage_page_cache); + // cache_manager must be destoried after _inverted_index_query_cache + // https://github.com/apache/doris/issues/24082#issuecomment-1712544039 + SAFE_DELETE(_cache_manager); + + SAFE_DELETE(_small_file_mgr); SAFE_DELETE(_broker_mgr); - SAFE_DELETE(_bfd_parser); SAFE_DELETE(_load_path_mgr); - SAFE_DELETE(_pipeline_task_scheduler); - SAFE_DELETE(_pipeline_task_group_scheduler); - SAFE_DELETE(_task_group_manager); + SAFE_DELETE(_result_mgr); + SAFE_DELETE(_file_meta_cache); + SAFE_DELETE(_group_commit_mgr); + SAFE_DELETE(_routine_load_task_executor); + // _stream_load_executor + SAFE_DELETE(_function_client_cache); + SAFE_DELETE(_internal_client_cache); + + SAFE_DELETE(_bfd_parser); + SAFE_DELETE(_result_cache); SAFE_DELETE(_fragment_mgr); + SAFE_DELETE(_task_group_manager); + SAFE_DELETE(_pipeline_task_group_scheduler); + SAFE_DELETE(_pipeline_task_scheduler); + SAFE_DELETE(_file_cache_factory); + // TODO(zhiqiang): Maybe we should call shutdown before release thread pool? + _join_node_thread_pool.reset(nullptr); + _send_report_thread_pool.reset(nullptr); + _buffered_reader_prefetch_thread_pool.reset(nullptr); + _send_batch_thread_pool.reset(nullptr); + SAFE_DELETE(_broker_client_cache); SAFE_DELETE(_frontend_client_cache); SAFE_DELETE(_backend_client_cache); - SAFE_DELETE(_result_mgr); SAFE_DELETE(_result_queue_mgr); - SAFE_DELETE(_routine_load_task_executor); + + SAFE_DELETE(_vstream_mgr); SAFE_DELETE(_external_scan_context_mgr); + SAFE_DELETE(_user_function_cache); + + _serial_download_cache_thread_token.reset(nullptr); + _download_cache_thread_pool.reset(nullptr); + + // _heartbeat_flags must be destoried after staroge engine SAFE_DELETE(_heartbeat_flags); - SAFE_DELETE(_scanner_scheduler); - SAFE_DELETE(_group_commit_mgr); - SAFE_DELETE(_file_meta_cache); + // Master Info is a thrift object, it could be the last one to deconstruct. // Master info should be deconstruct later than fragment manager, because fragment will // access master_info.backend id to access some info. If there is a running query and master // info is deconstructed then BE process will core at coordinator back method in fragment mgr. SAFE_DELETE(_master_info); - _new_load_stream_mgr.reset(); - _memtable_memory_limiter.reset(nullptr); - _send_batch_thread_pool.reset(nullptr); - _buffered_reader_prefetch_thread_pool.reset(nullptr); - _send_report_thread_pool.reset(nullptr); - _join_node_thread_pool.reset(nullptr); - _serial_download_cache_thread_token.reset(nullptr); - _download_cache_thread_pool.reset(nullptr); - _orphan_mem_tracker.reset(); - _experimental_mem_tracker.reset(); - _page_no_cache_mem_tracker.reset(); - _brpc_iobuf_block_memory_tracker.reset(); - InvertedIndexSearcherCache::reset_global_instance(); -} - -void ExecEnv::destroy(ExecEnv* env) { - env->_destroy(); + LOG(INFO) << "Doris exec envorinment is destoried."; } } // namespace doris diff --git a/be/src/runtime/external_scan_context_mgr.cpp b/be/src/runtime/external_scan_context_mgr.cpp index 2a3dc92521..6c51cfccbe 100644 --- a/be/src/runtime/external_scan_context_mgr.cpp +++ b/be/src/runtime/external_scan_context_mgr.cpp @@ -50,7 +50,7 @@ ExternalScanContextMgr::ExternalScanContextMgr(ExecEnv* exec_env) }); } -ExternalScanContextMgr::~ExternalScanContextMgr() { +void ExternalScanContextMgr::stop() { DEREGISTER_HOOK_METRIC(active_scan_context_count); _stop_background_threads_latch.count_down(); if (_keep_alive_reaper) { diff --git a/be/src/runtime/external_scan_context_mgr.h b/be/src/runtime/external_scan_context_mgr.h index 93ae8d360a..4925821f3a 100644 --- a/be/src/runtime/external_scan_context_mgr.h +++ b/be/src/runtime/external_scan_context_mgr.h @@ -52,7 +52,9 @@ public: class ExternalScanContextMgr { public: ExternalScanContextMgr(ExecEnv* exec_env); - ~ExternalScanContextMgr(); + ~ExternalScanContextMgr() = default; + + void stop(); Status create_scan_context(std::shared_ptr<ScanContext>* p_context); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 703019a39b..adbe401356 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -139,7 +139,9 @@ FragmentMgr::FragmentMgr(ExecEnv* exec_env) CHECK(s.ok()) << s.to_string(); } -FragmentMgr::~FragmentMgr() { +FragmentMgr::~FragmentMgr() {} + +void FragmentMgr::stop() { DEREGISTER_HOOK_METRIC(plan_fragment_count); DEREGISTER_HOOK_METRIC(fragment_thread_pool_queue_size); _stop_background_threads_latch.count_down(); diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 0cf5cf2d58..14c63c559b 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -75,6 +75,8 @@ public: FragmentMgr(ExecEnv* exec_env); ~FragmentMgr() override; + void stop(); + // execute one plan fragment Status exec_plan_fragment(const TExecPlanFragmentParams& params); diff --git a/be/src/runtime/frontend_info.h b/be/src/runtime/frontend_info.h index c16d63096f..a7e4b3f999 100644 --- a/be/src/runtime/frontend_info.h +++ b/be/src/runtime/frontend_info.h @@ -14,6 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. +#pragma once #include <gen_cpp/HeartbeatService_types.h> diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 5c9b3c1637..94c0dba30a 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -362,7 +362,14 @@ GroupCommitMgr::GroupCommitMgr(ExecEnv* exec_env) : _exec_env(exec_env) { .build(&_insert_into_thread_pool); } -GroupCommitMgr::~GroupCommitMgr() {} +GroupCommitMgr::~GroupCommitMgr() { + LOG(INFO) << "GroupCommitMgr is destoried"; +} + +void GroupCommitMgr::stop() { + _insert_into_thread_pool->shutdown(); + LOG(INFO) << "GroupCommitMgr is stopped"; +} Status GroupCommitMgr::group_commit_insert(int64_t table_id, const TPlan& plan, const TDescriptorTable& tdesc_tbl, diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index 0d5797ed96..1d124009d1 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -109,6 +109,8 @@ public: GroupCommitMgr(ExecEnv* exec_env); virtual ~GroupCommitMgr(); + void stop(); + // insert into Status group_commit_insert(int64_t table_id, const TPlan& plan, const TDescriptorTable& desc_tbl, diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index 9987a34d5b..1aaf8772b8 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -73,13 +73,16 @@ LoadChannelMgr::LoadChannelMgr() : _stop_background_threads_latch(1) { } LoadChannelMgr::~LoadChannelMgr() { + delete _last_success_channel; +} + +void LoadChannelMgr::stop() { DEREGISTER_HOOK_METRIC(load_channel_count); DEREGISTER_HOOK_METRIC(load_channel_mem_consumption); _stop_background_threads_latch.count_down(); if (_load_channels_clean_thread) { _load_channels_clean_thread->join(); } - delete _last_success_channel; } Status LoadChannelMgr::init(int64_t process_mem_limit) { diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index 3c094a4251..a77c0b0cc2 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -63,6 +63,8 @@ public: // cancel all tablet stream for 'load_id' load Status cancel(const PTabletWriterCancelRequest& request); + void stop(); + private: Status _get_load_channel(std::shared_ptr<LoadChannel>& channel, bool& is_eof, const UniqueId& load_id, const PTabletWriterAddBlockRequest& request); diff --git a/be/src/runtime/load_path_mgr.cpp b/be/src/runtime/load_path_mgr.cpp index c9a5f18183..e241eeafea 100644 --- a/be/src/runtime/load_path_mgr.cpp +++ b/be/src/runtime/load_path_mgr.cpp @@ -56,7 +56,7 @@ LoadPathMgr::LoadPathMgr(ExecEnv* exec_env) _error_path_next_shard(0), _stop_background_threads_latch(1) {} -LoadPathMgr::~LoadPathMgr() { +void LoadPathMgr::stop() { _stop_background_threads_latch.count_down(); if (_clean_thread) { _clean_thread->join(); diff --git a/be/src/runtime/load_path_mgr.h b/be/src/runtime/load_path_mgr.h index 73703517c6..de443f059b 100644 --- a/be/src/runtime/load_path_mgr.h +++ b/be/src/runtime/load_path_mgr.h @@ -39,9 +39,10 @@ class Thread; class LoadPathMgr { public: LoadPathMgr(ExecEnv* env); - ~LoadPathMgr(); + ~LoadPathMgr() = default; Status init(); + void stop(); Status allocate_dir(const std::string& db, const std::string& label, std::string* prefix); diff --git a/be/src/runtime/memory/cache_manager.h b/be/src/runtime/memory/cache_manager.h index fd7d5875b0..8fdce10d69 100644 --- a/be/src/runtime/memory/cache_manager.h +++ b/be/src/runtime/memory/cache_manager.h @@ -17,6 +17,7 @@ #pragma once +#include "runtime/exec_env.h" #include "runtime/memory/cache_policy.h" #include "util/runtime_profile.h" @@ -25,12 +26,11 @@ namespace doris { // Hold the list of all caches, for prune when memory not enough or timing. class CacheManager { public: - static void create_global_instance() { - DCHECK(_s_instance == nullptr); - static CacheManager instance; - _s_instance = &instance; + static CacheManager* create_global_instance() { + CacheManager* res = new CacheManager(); + return res; } - static CacheManager* instance() { return _s_instance; } + static CacheManager* instance() { return ExecEnv::GetInstance()->get_cache_manager(); } std::list<CachePolicy*>::iterator register_cache(CachePolicy* cache) { std::lock_guard<std::mutex> l(_caches_lock); @@ -55,8 +55,6 @@ public: void clear_once(CachePolicy::CacheType type); private: - static inline CacheManager* _s_instance = nullptr; - std::mutex _caches_lock; std::list<CachePolicy*> _caches; }; diff --git a/be/src/runtime/result_buffer_mgr.cpp b/be/src/runtime/result_buffer_mgr.cpp index 32eeb702df..a3b99300f2 100644 --- a/be/src/runtime/result_buffer_mgr.cpp +++ b/be/src/runtime/result_buffer_mgr.cpp @@ -47,7 +47,7 @@ ResultBufferMgr::ResultBufferMgr() : _stop_background_threads_latch(1) { }); } -ResultBufferMgr::~ResultBufferMgr() { +void ResultBufferMgr::stop() { DEREGISTER_HOOK_METRIC(result_buffer_block_count); _stop_background_threads_latch.count_down(); if (_clean_thread) { diff --git a/be/src/runtime/result_buffer_mgr.h b/be/src/runtime/result_buffer_mgr.h index f164a14b93..8c9b621968 100644 --- a/be/src/runtime/result_buffer_mgr.h +++ b/be/src/runtime/result_buffer_mgr.h @@ -47,9 +47,12 @@ class Thread; class ResultBufferMgr { public: ResultBufferMgr(); - ~ResultBufferMgr(); + ~ResultBufferMgr() = default; // init Result Buffer Mgr, start cancel thread Status init(); + + void stop(); + // create one result sender for this query_id // the returned sender do not need release // sender is not used when call cancel or unregister diff --git a/be/src/runtime/routine_load/data_consumer_pool.h b/be/src/runtime/routine_load/data_consumer_pool.h index 15baa2be6e..25dbc57fb7 100644 --- a/be/src/runtime/routine_load/data_consumer_pool.h +++ b/be/src/runtime/routine_load/data_consumer_pool.h @@ -41,7 +41,9 @@ public: DataConsumerPool(int64_t max_pool_size) : _max_pool_size(max_pool_size), _stop_background_threads_latch(1) {} - ~DataConsumerPool() { + ~DataConsumerPool() = default; + + void stop() { _stop_background_threads_latch.count_down(); if (_clean_idle_consumer_thread) { _clean_idle_consumer_thread->join(); diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index 72285930b4..e5c48f78d9 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -76,12 +76,15 @@ RoutineLoadTaskExecutor::RoutineLoadTaskExecutor(ExecEnv* exec_env) } RoutineLoadTaskExecutor::~RoutineLoadTaskExecutor() { + LOG(INFO) << _task_map.size() << " not executed tasks left, cleanup"; + _task_map.clear(); +} + +void RoutineLoadTaskExecutor::stop() { DEREGISTER_HOOK_METRIC(routine_load_task_count); _thread_pool.shutdown(); _thread_pool.join(); - - LOG(INFO) << _task_map.size() << " not executed tasks left, cleanup"; - _task_map.clear(); + _data_consumer_pool.stop(); } // Create a temp StreamLoadContext and set some kafka connection info in it. diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h b/be/src/runtime/routine_load/routine_load_task_executor.h index 90c1a06400..6714ce6902 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.h +++ b/be/src/runtime/routine_load/routine_load_task_executor.h @@ -51,6 +51,8 @@ public: ~RoutineLoadTaskExecutor(); + void stop(); + // submit a routine load task Status submit_task(const TRoutineLoadTask& task); diff --git a/be/src/runtime/task_group/task_group_manager.cpp b/be/src/runtime/task_group/task_group_manager.cpp index 6ce6d31604..179bf8911a 100644 --- a/be/src/runtime/task_group/task_group_manager.cpp +++ b/be/src/runtime/task_group/task_group_manager.cpp @@ -28,9 +28,6 @@ namespace doris::taskgroup { -TaskGroupManager::TaskGroupManager() = default; -TaskGroupManager::~TaskGroupManager() = default; - TaskGroupPtr TaskGroupManager::get_or_create_task_group(const TaskGroupInfo& task_group_info) { { std::shared_lock<std::shared_mutex> r_lock(_group_mutex); diff --git a/be/src/runtime/task_group/task_group_manager.h b/be/src/runtime/task_group/task_group_manager.h index 375208dc6e..baa6579b15 100644 --- a/be/src/runtime/task_group/task_group_manager.h +++ b/be/src/runtime/task_group/task_group_manager.h @@ -29,8 +29,8 @@ namespace taskgroup { class TaskGroupManager { public: - TaskGroupManager(); - ~TaskGroupManager(); + TaskGroupManager() = default; + ~TaskGroupManager() = default; TaskGroupPtr get_or_create_task_group(const TaskGroupInfo& task_group_info); diff --git a/be/src/runtime/user_function_cache.cpp b/be/src/runtime/user_function_cache.cpp index 39507ec222..9f07ba5902 100644 --- a/be/src/runtime/user_function_cache.cpp +++ b/be/src/runtime/user_function_cache.cpp @@ -39,6 +39,7 @@ #include "http/http_client.h" #include "io/fs/file_system.h" #include "io/fs/local_file_system.h" +#include "runtime/exec_env.h" #include "util/dynamic_util.h" #include "util/md5.h" #include "util/spinlock.h" @@ -122,8 +123,7 @@ UserFunctionCache::~UserFunctionCache() { } UserFunctionCache* UserFunctionCache::instance() { - static UserFunctionCache s_cache; - return &s_cache; + return ExecEnv::GetInstance()->user_function_cache(); } Status UserFunctionCache::init(const std::string& lib_dir) { diff --git a/be/src/service/brpc_service.cpp b/be/src/service/brpc_service.cpp index f35c49cbd2..57219f584e 100644 --- a/be/src/service/brpc_service.cpp +++ b/be/src/service/brpc_service.cpp @@ -86,8 +86,16 @@ Status BRpcService::start(int port, int num_threads) { } void BRpcService::join() { - _server->Stop(1000); - _server->Join(); + int stop_succeed = _server->Stop(1000); + + if (stop_succeed == 0) { + _server->Join(); + } else { + LOG(WARNING) << "Failed to stop brpc service, " + << "not calling brpc server join since it will never retrun." + << "maybe something bad will happen, let us know if you meet something error."; + } + _server->ClearServices(); } diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index fd55c5a211..abfa7913e7 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -57,7 +57,6 @@ #include "common/daemon.h" #include "common/logging.h" #include "common/phdr_cache.h" -#include "common/resource_tls.h" #include "common/signal_handler.h" #include "common/status.h" #include "io/cache/block/block_file_cache_factory.h" @@ -449,49 +448,6 @@ int main(int argc, char** argv) { // Or our own sig-handler for SIGINT & SIGTERM will not be chained ... // https://www.oracle.com/java/technologies/javase/signals.html doris::init_signals(); - - // Load file cache before starting up daemon threads to make sure StorageEngine is read. - if (doris::config::enable_file_cache) { - doris::io::IFileCache::init(); - std::unordered_set<std::string> cache_path_set; - std::vector<doris::CachePath> cache_paths; - olap_res = doris::parse_conf_cache_paths(doris::config::file_cache_path, cache_paths); - if (!olap_res) { - LOG(FATAL) << "parse config file cache path failed, path=" - << doris::config::file_cache_path; - exit(-1); - } - - std::unique_ptr<doris::ThreadPool> file_cache_init_pool; - doris::ThreadPoolBuilder("FileCacheInitThreadPool") - .set_min_threads(cache_paths.size()) - .set_max_threads(cache_paths.size()) - .build(&file_cache_init_pool); - - std::list<doris::Status> cache_status; - for (auto& cache_path : cache_paths) { - if (cache_path_set.find(cache_path.path) != cache_path_set.end()) { - LOG(WARNING) << fmt::format("cache path {} is duplicate", cache_path.path); - continue; - } - - RETURN_IF_ERROR(file_cache_init_pool->submit_func( - std::bind(&doris::io::FileCacheFactory::create_file_cache, - &(doris::io::FileCacheFactory::instance()), cache_path.path, - cache_path.init_settings(), &(cache_status.emplace_back())))); - - cache_path_set.emplace(cache_path.path); - } - - file_cache_init_pool->wait(); - for (const auto& status : cache_status) { - if (!status.ok()) { - LOG(FATAL) << "failed to init file cache, err: " << status; - exit(-1); - } - } - } - // ATTN: MUST init before `ExecEnv`, `StorageEngine` and other daemon services // // Daemon ───┬──► StorageEngine ──► ExecEnv ──► Disk/Mem/CpuInfo @@ -501,7 +457,6 @@ int main(int argc, char** argv) { doris::CpuInfo::init(); doris::DiskInfo::init(); doris::MemInfo::init(); - doris::UserFunctionCache::instance()->init(doris::config::user_function_dir); LOG(INFO) << doris::CpuInfo::debug_string(); LOG(INFO) << doris::DiskInfo::debug_string(); @@ -511,41 +466,17 @@ int main(int argc, char** argv) { // will work only after additional call of this function. // rewrites dl_iterate_phdr will cause Jemalloc to fail to run after enable profile. see # // updatePHDRCache(); - - doris::ResourceTls::init(); if (!doris::BackendOptions::init()) { exit(-1); } // init exec env - auto exec_env = doris::ExecEnv::GetInstance(); - doris::ExecEnv::init(exec_env, paths); - doris::TabletSchemaCache::create_global_schema_cache(); - - // init s3 write buffer pool - doris::io::S3FileBufferPool* s3_buffer_pool = doris::io::S3FileBufferPool::GetInstance(); - s3_buffer_pool->init(doris::config::s3_write_buffer_whole_size, - doris::config::s3_write_buffer_size, - exec_env->buffered_reader_prefetch_thread_pool()); - - // init and open storage engine - doris::EngineOptions options; - options.store_paths = paths; - options.backend_uid = doris::UniqueId::gen_uid(); - std::unique_ptr<doris::StorageEngine> engine; - auto st = doris::StorageEngine::open(options, &engine); - if (!st.ok()) { - LOG(FATAL) << "fail to open StorageEngine, res=" << st; + auto exec_env(doris::ExecEnv::GetInstance()); + status = doris::ExecEnv::init(doris::ExecEnv::GetInstance(), paths); + if (status != Status::OK()) { + LOG(FATAL) << "failed to init doris storage engine, res=" << status; exit(-1); } - engine->set_heartbeat_flags(exec_env->heartbeat_flags()); - - // start all background threads of storage engine. - // SHOULD be called after exec env is initialized. - EXIT_IF_ERROR(engine->start_bg_threads()); - - doris::Daemon daemon; - daemon.start(); doris::telemetry::init_tracer(); @@ -563,8 +494,9 @@ int main(int argc, char** argv) { } // 2. bprc service - doris::BRpcService brpc_service(exec_env); - status = brpc_service.start(doris::config::brpc_port, doris::config::brpc_num_threads); + std::unique_ptr<doris::BRpcService> brpc_service = + std::make_unique<doris::BRpcService>(exec_env); + status = brpc_service->start(doris::config::brpc_port, doris::config::brpc_num_threads); if (!status.ok()) { LOG(ERROR) << "BRPC service did not start correctly, exiting"; doris::shutdown_logging(); @@ -572,9 +504,9 @@ int main(int argc, char** argv) { } // 3. http service - doris::HttpService http_service(exec_env, doris::config::webserver_port, - doris::config::webserver_num_workers); - status = http_service.start(); + std::unique_ptr<doris::HttpService> http_service = std::make_unique<doris::HttpService>( + exec_env, doris::config::webserver_port, doris::config::webserver_num_workers); + status = http_service->start(); if (!status.ok()) { LOG(ERROR) << "Doris Be http service did not start correctly, exiting"; doris::shutdown_logging(); @@ -605,6 +537,10 @@ int main(int argc, char** argv) { std::shared_ptr<doris::flight::FlightSqlServer> flight_server = std::move(doris::flight::FlightSqlServer::create()).ValueOrDie(); status = flight_server->init(doris::config::arrow_flight_port); + + // 6. start daemon thread to do clean or gc jobs + doris::Daemon daemon; + daemon.start(); if (!status.ok()) { LOG(ERROR) << "Arrow Flight Service did not start correctly, exiting, " << status.to_string(); @@ -618,10 +554,26 @@ int main(int argc, char** argv) { #endif sleep(3); } - + LOG(INFO) << "Doris main exiting."; // For graceful shutdown, need to wait for all running queries to stop exec_env->wait_for_all_tasks_done(); - + daemon.stop(); + flight_server.reset(); + LOG(INFO) << "Flight server stopped."; + heartbeat_thrift_server->stop(); + heartbeat_thrift_server.reset(nullptr); + LOG(INFO) << "Heartbeat server stopped"; + // TODO(zhiqiang): http_service + http_service->stop(); + http_service.reset(nullptr); + LOG(INFO) << "Http service stopped"; + be_server->stop(); + be_server.reset(nullptr); + LOG(INFO) << "Be server stopped"; + brpc_service.reset(nullptr); + LOG(INFO) << "Brpc service stopped"; + exec_env->destroy(); + LOG(INFO) << "Doris main exited."; return 0; } diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp index dcc3082972..fd8c640d20 100644 --- a/be/src/service/http_service.cpp +++ b/be/src/service/http_service.cpp @@ -239,8 +239,12 @@ Status HttpService::start() { } void HttpService::stop() { + if (stopped) { + return; + } _ev_http_server->stop(); _pool.clear(); + stopped = true; } } // namespace doris diff --git a/be/src/service/http_service.h b/be/src/service/http_service.h index dea8c4141c..05d36fdd75 100644 --- a/be/src/service/http_service.h +++ b/be/src/service/http_service.h @@ -43,6 +43,8 @@ private: std::unique_ptr<EvHttpServer> _ev_http_server; std::unique_ptr<WebPageHandler> _web_page_handler; + + bool stopped = false; }; } // namespace doris diff --git a/be/src/service/point_query_executor.cpp b/be/src/service/point_query_executor.cpp index df762226a2..a1cf85f05a 100644 --- a/be/src/service/point_query_executor.cpp +++ b/be/src/service/point_query_executor.cpp @@ -33,6 +33,7 @@ #include "olap/storage_engine.h" #include "olap/tablet_manager.h" #include "olap/tablet_schema.h" +#include "runtime/exec_env.h" #include "runtime/runtime_state.h" #include "util/key_util.h" #include "util/runtime_profile.h" @@ -101,15 +102,12 @@ int64_t Reusable::mem_size() const { return _mem_size; } -LookupConnectionCache* LookupConnectionCache::_s_instance = nullptr; -void LookupConnectionCache::create_global_instance(size_t capacity) { - DCHECK(_s_instance == nullptr); - static LookupConnectionCache instance(capacity); - _s_instance = &instance; +LookupConnectionCache* LookupConnectionCache::create_global_instance(size_t capacity) { + DCHECK(ExecEnv::GetInstance()->get_lookup_connection_cache() == nullptr); + LookupConnectionCache* res = new LookupConnectionCache(capacity); + return res; } -RowCache* RowCache::_s_instance = nullptr; - RowCache::RowCache(int64_t capacity, int num_shards) { // Create Row Cache _cache = std::unique_ptr<Cache>( @@ -117,14 +115,14 @@ RowCache::RowCache(int64_t capacity, int num_shards) { } // Create global instance of this class -void RowCache::create_global_cache(int64_t capacity, uint32_t num_shards) { - DCHECK(_s_instance == nullptr); - static RowCache instance(capacity, num_shards); - _s_instance = &instance; +RowCache* RowCache::create_global_cache(int64_t capacity, uint32_t num_shards) { + DCHECK(ExecEnv::GetInstance()->get_row_cache() == nullptr); + RowCache* res = new RowCache(capacity, num_shards); + return res; } RowCache* RowCache::instance() { - return _s_instance; + return ExecEnv::GetInstance()->get_row_cache(); } bool RowCache::lookup(const RowCacheKey& key, CacheHandle* handle) { diff --git a/be/src/service/point_query_executor.h b/be/src/service/point_query_executor.h index 180af54fdd..4e51217c3b 100644 --- a/be/src/service/point_query_executor.h +++ b/be/src/service/point_query_executor.h @@ -46,6 +46,7 @@ #include "olap/tablet.h" #include "olap/utils.h" #include "runtime/descriptors.h" +#include "runtime/exec_env.h" #include "util/mysql_global.h" #include "util/runtime_profile.h" #include "util/slice.h" @@ -161,7 +162,7 @@ public: }; // Create global instance of this class - static void create_global_cache(int64_t capacity, uint32_t num_shards = kDefaultNumShards); + static RowCache* create_global_cache(int64_t capacity, uint32_t num_shards = kDefaultNumShards); static RowCache* instance(); @@ -183,7 +184,6 @@ public: private: static constexpr uint32_t kDefaultNumShards = 128; RowCache(int64_t capacity, int num_shards = kDefaultNumShards); - static RowCache* _s_instance; std::unique_ptr<Cache> _cache = nullptr; }; @@ -191,9 +191,11 @@ private: // One connection per stmt perf uuid class LookupConnectionCache : public LRUCachePolicy { public: - static LookupConnectionCache* instance() { return _s_instance; } + static LookupConnectionCache* instance() { + return ExecEnv::GetInstance()->get_lookup_connection_cache(); + } - static void create_global_instance(size_t capacity); + static LookupConnectionCache* create_global_instance(size_t capacity); private: friend class PointQueryExecutor; @@ -240,8 +242,6 @@ private: struct CacheValue : public LRUCacheValueBase { std::shared_ptr<Reusable> item = nullptr; }; - - static LookupConnectionCache* _s_instance; }; struct Metrics { diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 87903e480a..439a843c3a 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -61,6 +61,17 @@ ScannerScheduler::~ScannerScheduler() { return; } + for (int i = 0; i < QUEUE_NUM; i++) { + delete _pending_queues[i]; + } + delete[] _pending_queues; +} + +void ScannerScheduler::stop() { + if (!_is_init) { + return; + } + for (int i = 0; i < QUEUE_NUM; i++) { _pending_queues[i]->shutdown(); } @@ -80,13 +91,10 @@ ScannerScheduler::~ScannerScheduler() { _limited_scan_thread_pool->wait(); _group_local_scan_thread_pool->wait(); - for (int i = 0; i < QUEUE_NUM; i++) { - delete _pending_queues[i]; - } - delete[] _pending_queues; + LOG(INFO) << "ScannerScheduler stopped"; } -Status ScannerScheduler::init(ExecEnv* env) { +Status ScannerScheduler::init() { // 1. scheduling thread pool and scheduling queues ThreadPoolBuilder("SchedulingThreadPool") .set_min_threads(QUEUE_NUM) diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index e669fd9b77..366275eb41 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -64,10 +64,12 @@ public: ScannerScheduler(); ~ScannerScheduler(); - Status init(ExecEnv* env); + [[nodiscard]] Status init(); [[nodiscard]] Status submit(ScannerContext* ctx); + void stop(); + std::unique_ptr<ThreadPoolToken> new_limited_scan_pool_token(ThreadPool::ExecutionMode mode, int max_concurrency); taskgroup::ScanTaskTaskGroupQueue* local_scan_task_queue() { diff --git a/be/test/common/resource_tls_test.cpp b/be/test/common/resource_tls_test.cpp deleted file mode 100644 index e09227eb03..0000000000 --- a/be/test/common/resource_tls_test.cpp +++ /dev/null @@ -1,49 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "common/resource_tls.h" - -#include <gtest/gtest-message.h> -#include <gtest/gtest-test-part.h> - -#include <memory> - -#include "gen_cpp/Types_types.h" -#include "gtest/gtest_pred_impl.h" - -namespace doris { - -class ResourceTlsTest : public testing::Test {}; - -TEST_F(ResourceTlsTest, EmptyTest) { - EXPECT_TRUE(ResourceTls::get_resource_tls() == nullptr); - EXPECT_TRUE(ResourceTls::set_resource_tls((TResourceInfo*)1) != 0); -} - -TEST_F(ResourceTlsTest, NormalTest) { - ResourceTls::init(); - EXPECT_TRUE(ResourceTls::get_resource_tls() == nullptr); - TResourceInfo* info = new TResourceInfo(); - info->user = "testUser"; - info->group = "testGroup"; - EXPECT_TRUE(ResourceTls::set_resource_tls(info) == 0); - TResourceInfo* getInfo = ResourceTls::get_resource_tls(); - EXPECT_STREQ("testUser", getInfo->user.c_str()); - EXPECT_STREQ("testGroup", getInfo->group.c_str()); -} - -} // namespace doris diff --git a/be/test/olap/delete_bitmap_calculator_test.cpp b/be/test/olap/delete_bitmap_calculator_test.cpp index 6e842edc36..53bdb9c0fb 100644 --- a/be/test/olap/delete_bitmap_calculator_test.cpp +++ b/be/test/olap/delete_bitmap_calculator_test.cpp @@ -40,6 +40,7 @@ #include "olap/tablet_meta.h" #include "olap/tablet_schema.h" #include "olap/tablet_schema_helper.h" +#include "runtime/exec_env.h" namespace doris { using namespace ErrorCode; @@ -73,7 +74,7 @@ public: EXPECT_TRUE(io::global_local_filesystem()->delete_and_create_directory(kSegmentDir).ok()); doris::EngineOptions options; k_engine = new StorageEngine(options); - StorageEngine::_s_instance = k_engine; + ExecEnv::GetInstance()->set_storage_engine(k_engine); } void TearDown() override { @@ -82,6 +83,7 @@ public: k_engine->stop(); delete k_engine; k_engine = nullptr; + ExecEnv::GetInstance()->set_storage_engine(nullptr); } } diff --git a/be/test/olap/delete_handler_test.cpp b/be/test/olap/delete_handler_test.cpp index 605e3e389c..b338320d0f 100644 --- a/be/test/olap/delete_handler_test.cpp +++ b/be/test/olap/delete_handler_test.cpp @@ -29,6 +29,7 @@ #include <cstdlib> #include <iostream> +#include <memory> #include <string> #include <vector> @@ -46,6 +47,7 @@ #include "olap/storage_engine.h" #include "olap/tablet.h" #include "olap/tablet_manager.h" +#include "runtime/exec_env.h" #include "util/cpu_info.h" using namespace std; @@ -78,8 +80,10 @@ static void set_up() { doris::EngineOptions options; options.store_paths = paths; - Status s = doris::StorageEngine::open(options, &k_engine); + k_engine = std::make_unique<StorageEngine>(options); + Status s = k_engine->open(); EXPECT_TRUE(s.ok()) << s.to_string(); + ExecEnv::GetInstance()->set_storage_engine(k_engine.get()); } static void tear_down() { @@ -90,6 +94,7 @@ static void tear_down() { EXPECT_TRUE(io::global_local_filesystem() ->delete_directory(string(getenv("DORIS_HOME")) + "/" + UNUSED_PREFIX) .ok()); + ExecEnv::GetInstance()->set_storage_engine(nullptr); k_engine.reset(); } diff --git a/be/test/olap/delta_writer_test.cpp b/be/test/olap/delta_writer_test.cpp index a8dfa4a760..6fc57fd15d 100644 --- a/be/test/olap/delta_writer_test.cpp +++ b/be/test/olap/delta_writer_test.cpp @@ -79,17 +79,20 @@ static void set_up() { doris::EngineOptions options; options.store_paths = paths; - Status s = doris::StorageEngine::open(options, &k_engine); + k_engine = std::make_unique<StorageEngine>(options); + Status s = k_engine->open(); EXPECT_TRUE(s.ok()) << s.to_string(); ExecEnv* exec_env = doris::ExecEnv::GetInstance(); exec_env->set_memtable_memory_limiter(new MemTableMemoryLimiter()); + exec_env->set_storage_engine(k_engine.get()); k_engine->start_bg_threads(); } static void tear_down() { ExecEnv* exec_env = doris::ExecEnv::GetInstance(); exec_env->set_memtable_memory_limiter(nullptr); + exec_env->set_storage_engine(nullptr); k_engine.reset(); EXPECT_EQ(system("rm -rf ./data_test"), 0); io::global_local_filesystem()->delete_directory(std::string(getenv("DORIS_HOME")) + "/" + diff --git a/be/test/olap/engine_storage_migration_task_test.cpp b/be/test/olap/engine_storage_migration_task_test.cpp index 706cb74979..037e2a70c7 100644 --- a/be/test/olap/engine_storage_migration_task_test.cpp +++ b/be/test/olap/engine_storage_migration_task_test.cpp @@ -77,16 +77,19 @@ static void set_up() { doris::EngineOptions options; options.store_paths = paths; - Status s = doris::StorageEngine::open(options, &k_engine); + k_engine = std::make_unique<StorageEngine>(options); + Status s = k_engine->open(); EXPECT_TRUE(s.ok()) << s.to_string(); ExecEnv* exec_env = doris::ExecEnv::GetInstance(); - k_engine->start_bg_threads(); exec_env->set_memtable_memory_limiter(new MemTableMemoryLimiter()); + exec_env->set_storage_engine(k_engine.get()); + k_engine->start_bg_threads(); } static void tear_down() { ExecEnv* exec_env = doris::ExecEnv::GetInstance(); exec_env->set_memtable_memory_limiter(nullptr); + exec_env->set_storage_engine(nullptr); k_engine.reset(); EXPECT_EQ(system("rm -rf ./data_test_1"), 0); EXPECT_EQ(system("rm -rf ./data_test_2"), 0); diff --git a/be/test/olap/memtable_flush_executor_test.cpp b/be/test/olap/memtable_flush_executor_test.cpp index efe95e36a3..283b010b76 100644 --- a/be/test/olap/memtable_flush_executor_test.cpp +++ b/be/test/olap/memtable_flush_executor_test.cpp @@ -55,13 +55,15 @@ void set_up() { doris::EngineOptions options; options.store_paths = paths; - Status s = doris::StorageEngine::open(options, &k_engine); + k_engine = std::make_unique<StorageEngine>(options); + Status s = k_engine->open(); EXPECT_TRUE(s.ok()) << s.to_string(); - + ExecEnv::GetInstance()->set_storage_engine(k_engine.get()); k_flush_executor = k_engine->memtable_flush_executor(); } void tear_down() { + ExecEnv::GetInstance()->set_storage_engine(nullptr); k_engine.reset(); system("rm -rf ./flush_test"); EXPECT_TRUE(io::global_local_filesystem() diff --git a/be/test/olap/memtable_memory_limiter_test.cpp b/be/test/olap/memtable_memory_limiter_test.cpp index 187fc09cb4..22a8421fd9 100644 --- a/be/test/olap/memtable_memory_limiter_test.cpp +++ b/be/test/olap/memtable_memory_limiter_test.cpp @@ -85,15 +85,23 @@ protected: doris::EngineOptions options; options.store_paths = paths; - Status s = doris::StorageEngine::open(options, &_engine); + _engine = std::make_unique<StorageEngine>(options); + Status st = _engine->open(); + EXPECT_TRUE(st.ok()) << st.to_string(); + ExecEnv* exec_env = doris::ExecEnv::GetInstance(); - _engine->start_bg_threads(); + // ExecEnv's storage_engine will be read by storage_engine's other operations. + // So we must do this before storage engine's other operation. + exec_env->set_storage_engine(_engine.get()); exec_env->set_memtable_memory_limiter(new MemTableMemoryLimiter()); + _engine->start_bg_threads(); } void TearDown() override { ExecEnv* exec_env = doris::ExecEnv::GetInstance(); exec_env->set_memtable_memory_limiter(nullptr); + exec_env->set_storage_engine(nullptr); + _engine.reset(nullptr); EXPECT_EQ(system("rm -rf ./data_test"), 0); io::global_local_filesystem()->delete_directory(std::string(getenv("DORIS_HOME")) + "/" + UNUSED_PREFIX); diff --git a/be/test/olap/ordered_data_compaction_test.cpp b/be/test/olap/ordered_data_compaction_test.cpp index ab0562a3ac..b53689c229 100644 --- a/be/test/olap/ordered_data_compaction_test.cpp +++ b/be/test/olap/ordered_data_compaction_test.cpp @@ -61,6 +61,7 @@ #include "olap/tablet_meta.h" #include "olap/tablet_schema.h" #include "olap/utils.h" +#include "runtime/exec_env.h" #include "util/uid_util.h" #include "vec/columns/column.h" #include "vec/core/block.h" @@ -89,8 +90,7 @@ protected: _data_dir->update_capacity(); doris::EngineOptions options; k_engine = new StorageEngine(options); - StorageEngine::_s_instance = k_engine; - + ExecEnv::GetInstance()->set_storage_engine(k_engine); config::enable_ordered_data_compaction = true; config::ordered_data_compaction_min_segment_size = 10; } @@ -100,6 +100,7 @@ protected: k_engine->stop(); delete k_engine; k_engine = nullptr; + ExecEnv::GetInstance()->set_storage_engine(nullptr); } } diff --git a/be/test/olap/rowid_conversion_test.cpp b/be/test/olap/rowid_conversion_test.cpp index 95b34cf0f8..6592cb53dd 100644 --- a/be/test/olap/rowid_conversion_test.cpp +++ b/be/test/olap/rowid_conversion_test.cpp @@ -54,6 +54,7 @@ #include "olap/tablet.h" #include "olap/tablet_meta.h" #include "olap/tablet_schema.h" +#include "runtime/exec_env.h" #include "util/uid_util.h" #include "vec/columns/column.h" #include "vec/core/block.h" @@ -77,7 +78,7 @@ protected: .ok()); doris::EngineOptions options; k_engine = new StorageEngine(options); - StorageEngine::_s_instance = k_engine; + ExecEnv::GetInstance()->set_storage_engine(k_engine); } void TearDown() override { @@ -86,6 +87,7 @@ protected: k_engine->stop(); delete k_engine; k_engine = nullptr; + ExecEnv::GetInstance()->set_storage_engine(nullptr); } } diff --git a/be/test/olap/rowset/beta_rowset_test.cpp b/be/test/olap/rowset/beta_rowset_test.cpp index 5e259745d6..81e587370a 100644 --- a/be/test/olap/rowset/beta_rowset_test.cpp +++ b/be/test/olap/rowset/beta_rowset_test.cpp @@ -100,13 +100,18 @@ public: doris::EngineOptions options; options.store_paths = paths; - Status s = doris::StorageEngine::open(options, &k_engine); + k_engine = std::make_unique<StorageEngine>(options); + Status s = k_engine->open(); EXPECT_TRUE(s.ok()) << s.to_string(); + ExecEnv::GetInstance()->set_storage_engine(k_engine.get()); EXPECT_TRUE(io::global_local_filesystem()->create_directory(kTestDir).ok()); } - static void TearDownTestSuite() { k_engine.reset(); } + static void TearDownTestSuite() { + ExecEnv::GetInstance()->set_storage_engine(nullptr); + k_engine.reset(); + } protected: OlapReaderStatistics _stats; diff --git a/be/test/olap/rowset/rowset_meta_manager_test.cpp b/be/test/olap/rowset/rowset_meta_manager_test.cpp index a747d1fa2c..044b95ff97 100644 --- a/be/test/olap/rowset/rowset_meta_manager_test.cpp +++ b/be/test/olap/rowset/rowset_meta_manager_test.cpp @@ -36,6 +36,7 @@ #include "olap/olap_meta.h" #include "olap/options.h" #include "olap/storage_engine.h" +#include "runtime/exec_env.h" #include "util/uid_util.h" using ::testing::_; @@ -60,6 +61,7 @@ public: if (k_engine == nullptr) { k_engine = new StorageEngine(options); } + ExecEnv::GetInstance()->set_storage_engine(k_engine); std::string meta_path = "./meta"; EXPECT_TRUE(std::filesystem::create_directory(meta_path)); @@ -83,6 +85,7 @@ public: virtual void TearDown() { SAFE_DELETE(_meta); + ExecEnv::GetInstance()->set_storage_engine(nullptr); SAFE_DELETE(k_engine); EXPECT_TRUE(std::filesystem::remove_all("./meta")); LOG(INFO) << "TearDown"; diff --git a/be/test/olap/tablet_cooldown_test.cpp b/be/test/olap/tablet_cooldown_test.cpp index 03fa82c3cc..e475791da3 100644 --- a/be/test/olap/tablet_cooldown_test.cpp +++ b/be/test/olap/tablet_cooldown_test.cpp @@ -248,13 +248,17 @@ public: EngineOptions options; options.store_paths = paths; - doris::StorageEngine::open(options, &k_engine); + k_engine = std::make_unique<StorageEngine>(options); + auto st = k_engine->open(); + EXPECT_TRUE(st.ok()) << st.to_string(); ExecEnv* exec_env = doris::ExecEnv::GetInstance(); exec_env->set_memtable_memory_limiter(new MemTableMemoryLimiter()); + exec_env->set_storage_engine(k_engine.get()); } static void TearDownTestSuite() { ExecEnv* exec_env = doris::ExecEnv::GetInstance(); + exec_env->set_storage_engine(nullptr); exec_env->set_memtable_memory_limiter(nullptr); k_engine.reset(); } diff --git a/be/test/olap/tablet_mgr_test.cpp b/be/test/olap/tablet_mgr_test.cpp index 5954d0329f..a9a4bb940d 100644 --- a/be/test/olap/tablet_mgr_test.cpp +++ b/be/test/olap/tablet_mgr_test.cpp @@ -41,6 +41,7 @@ #include "olap/tablet_manager.h" #include "olap/tablet_meta.h" #include "olap/tablet_meta_manager.h" +#include "runtime/exec_env.h" #include "util/uid_util.h" using ::testing::_; @@ -66,6 +67,7 @@ public: // won't open engine, options.path is needless options.backend_uid = UniqueId::gen_uid(); k_engine = new StorageEngine(options); + ExecEnv::GetInstance()->set_storage_engine(k_engine); _data_dir = new DataDir(_engine_data_path, 1000000000); _data_dir->init(); _tablet_mgr = k_engine->tablet_manager(); @@ -77,6 +79,7 @@ public: if (k_engine != nullptr) { k_engine->stop(); } + ExecEnv::GetInstance()->set_storage_engine(nullptr); SAFE_DELETE(k_engine); _tablet_mgr = nullptr; } diff --git a/be/test/olap/tablet_test.cpp b/be/test/olap/tablet_test.cpp index 838db9b0d6..3fc32d3b3d 100644 --- a/be/test/olap/tablet_test.cpp +++ b/be/test/olap/tablet_test.cpp @@ -34,6 +34,7 @@ #include "olap/storage_policy.h" #include "olap/tablet_meta.h" #include "olap/utils.h" +#include "runtime/exec_env.h" #include "testutil/mock_rowset.h" #include "util/time.h" #include "util/uid_util.h" @@ -86,7 +87,7 @@ public: doris::EngineOptions options; k_engine = new StorageEngine(options); - StorageEngine::_s_instance = k_engine; + ExecEnv::GetInstance()->set_storage_engine(k_engine); } void TearDown() override { @@ -95,6 +96,7 @@ public: k_engine->stop(); delete k_engine; k_engine = nullptr; + ExecEnv::GetInstance()->set_storage_engine(nullptr); } } diff --git a/be/test/olap/txn_manager_test.cpp b/be/test/olap/txn_manager_test.cpp index 2d27576b5a..fe05e206c7 100644 --- a/be/test/olap/txn_manager_test.cpp +++ b/be/test/olap/txn_manager_test.cpp @@ -113,6 +113,7 @@ public: if (k_engine == nullptr) { k_engine = new StorageEngine(options); } + ExecEnv::GetInstance()->set_storage_engine(k_engine); std::string meta_path = "./meta"; std::filesystem::remove_all("./meta"); diff --git a/be/test/runtime/external_scan_context_mgr_test.cpp b/be/test/runtime/external_scan_context_mgr_test.cpp index ba4febda10..c313916476 100644 --- a/be/test/runtime/external_scan_context_mgr_test.cpp +++ b/be/test/runtime/external_scan_context_mgr_test.cpp @@ -38,13 +38,11 @@ public: _exec_env._fragment_mgr = fragment_mgr; _exec_env._result_queue_mgr = result_queue_mgr; } - virtual ~ExternalScanContextMgrTest() { - delete _exec_env._fragment_mgr; - delete _exec_env._result_queue_mgr; - } + ~ExternalScanContextMgrTest() = default; protected: - virtual void SetUp() {} + void SetUp() override { _exec_env.set_ready(); } + void TearDown() override { _exec_env.destroy(); } private: ExecEnv _exec_env; diff --git a/be/test/runtime/load_stream_test.cpp b/be/test/runtime/load_stream_test.cpp index dae8b93e19..f3c9b55cd7 100644 --- a/be/test/runtime/load_stream_test.cpp +++ b/be/test/runtime/load_stream_test.cpp @@ -29,6 +29,7 @@ #include <unistd.h> #include <functional> +#include <memory> #include "common/config.h" #include "common/status.h" @@ -582,10 +583,10 @@ public: doris::EngineOptions options; options.store_paths = paths; - Status s = doris::StorageEngine::open(options, &k_engine); + k_engine = std::make_unique<StorageEngine>(options); + Status s = k_engine->open(); EXPECT_TRUE(s.ok()) << s.to_string(); - - _env = doris::ExecEnv::GetInstance(); + doris::ExecEnv::GetInstance()->set_storage_engine(k_engine.get()); EXPECT_TRUE(io::global_local_filesystem()->create_directory(zTestDir).ok()); @@ -611,6 +612,7 @@ public: } void TearDown() override { + ExecEnv::GetInstance()->set_storage_engine(nullptr); k_engine.reset(); _server->Stop(1000); _load_stream_mgr.reset(); diff --git a/be/test/runtime/routine_load_task_executor_test.cpp b/be/test/runtime/routine_load_task_executor_test.cpp index f1088300cd..8a8dcc4d67 100644 --- a/be/test/runtime/routine_load_task_executor_test.cpp +++ b/be/test/runtime/routine_load_task_executor_test.cpp @@ -93,7 +93,6 @@ TEST_F(RoutineLoadTaskExecutorTest, exec_task) { task.__set_kafka_load_info(k_info); RoutineLoadTaskExecutor executor(&_env); - // submit task Status st; st = executor.submit_task(task); @@ -116,6 +115,8 @@ TEST_F(RoutineLoadTaskExecutorTest, exec_task) { task.__set_kafka_load_info(k_info); st = executor.submit_task(task); EXPECT_TRUE(st.ok()); + + executor.stop(); } -} // namespace doris +} // namespace doris \ No newline at end of file diff --git a/be/test/testutil/run_all_tests.cpp b/be/test/testutil/run_all_tests.cpp index 2e735ce8fa..de062fa930 100644 --- a/be/test/testutil/run_all_tests.cpp +++ b/be/test/testutil/run_all_tests.cpp @@ -39,10 +39,13 @@ int main(int argc, char** argv) { doris::ExecEnv::GetInstance()->init_mem_tracker(); doris::thread_context()->thread_mem_tracker_mgr->init(); - doris::CacheManager::create_global_instance(); - doris::TabletSchemaCache::create_global_schema_cache(); - doris::StoragePageCache::create_global_cache(1 << 30, 10, 0); - doris::SegmentLoader::create_global_instance(1000); + doris::ExecEnv::GetInstance()->set_cache_manager(doris::CacheManager::create_global_instance()); + doris::ExecEnv::GetInstance()->set_tablet_schema_cache( + doris::TabletSchemaCache::create_global_schema_cache()); + doris::ExecEnv::GetInstance()->get_tablet_schema_cache()->start(); + doris::ExecEnv::GetInstance()->set_storage_page_cache( + doris::StoragePageCache::create_global_cache(1 << 30, 10, 0)); + doris::ExecEnv::GetInstance()->set_segment_loader(new doris::SegmentLoader(1000)); std::string conf = std::string(getenv("DORIS_HOME")) + "/conf/be.conf"; if (!doris::config::init(conf.c_str(), false)) { fprintf(stderr, "error read config file. \n"); @@ -54,5 +57,7 @@ int main(int argc, char** argv) { doris::DiskInfo::init(); doris::MemInfo::init(); doris::BackendOptions::init(); - return RUN_ALL_TESTS(); + int res = RUN_ALL_TESTS(); + doris::ExecEnv::GetInstance()->get_tablet_schema_cache()->stop(); + return res; } diff --git a/be/test/vec/olap/vertical_compaction_test.cpp b/be/test/vec/olap/vertical_compaction_test.cpp index 14a202845a..74f11d7e7d 100644 --- a/be/test/vec/olap/vertical_compaction_test.cpp +++ b/be/test/vec/olap/vertical_compaction_test.cpp @@ -61,6 +61,7 @@ #include "olap/tablet_meta.h" #include "olap/tablet_schema.h" #include "olap/utils.h" +#include "runtime/exec_env.h" #include "util/uid_util.h" #include "vec/columns/column.h" #include "vec/core/block.h" @@ -93,7 +94,7 @@ protected: doris::EngineOptions options; k_engine = new StorageEngine(options); - StorageEngine::_s_instance = k_engine; + ExecEnv::GetInstance()->set_storage_engine(k_engine); } void TearDown() override { SAFE_DELETE(_data_dir); @@ -102,6 +103,7 @@ protected: k_engine->stop(); delete k_engine; k_engine = nullptr; + ExecEnv::GetInstance()->set_storage_engine(nullptr); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org